【Netty】Promise 源码分析(十七)

news/2024/7/6 1:07:48 标签: 前端, javascript, java

文章目录

  • 前言
  • 一、Promise 接口
  • 二、Netty 的 DefaultPromise
    • 2.1、设置任务的成功或失败
    • 2.2、获取 Future 任务执行结果和添加监听事件
  • 三、Netty 的 DefaultChannelPromise
  • 总结

前言

回顾Netty系列文章:

  • Netty 概述(一)
  • Netty 架构设计(二)
  • Netty Channel 概述(三)
  • Netty ChannelHandler(四)
  • ChannelPipeline源码分析(五)
  • 字节缓冲区 ByteBuf (六)(上)
  • 字节缓冲区 ByteBuf(七)(下)
  • Netty 如何实现零拷贝(八)
  • Netty 程序引导类(九)
  • Reactor 模型(十)
  • 工作原理详解(十一)
  • Netty 解码器(十二)
  • Netty 编码器(十三)
  • Netty 编解码器(十四)
  • 自定义解码器、编码器、编解码器(十五)
  • Future 源码分析(十六)

本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。

一、Promise 接口

在 Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:

java">public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V var1);

    boolean trySuccess(V var1);

    Promise<V> setFailure(Throwable var1);

    boolean tryFailure(Throwable var1);

    boolean setUncancellable();

    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);

    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);

    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);

    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);

    Promise<V> await() throws InterruptedException;

    Promise<V> awaitUninterruptibly();

    Promise<V> sync() throws InterruptedException;

    Promise<V> syncUninterruptibly();
}

从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。

java">//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {

    Promise promise = new PromiseImpl();

    Object result = null;

    return =search()  //业务逻辑

        if (sucess) {
            promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果
        } else if (failed) {
            promise.setFailure(reason);//通知 promise 当前异步任务失败了
        } else if (error) {
            promise.setFailure(error);//通知 promise 当前异步任务发生了异常
        }
}

//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise

//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件

//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();

在 Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。

二、Netty 的 DefaultPromise

Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。

java">//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (AbstractChannel.this.isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    } else {
        AbstractChannel.this.eventLoop = eventLoop;
        if (eventLoop.inEventLoop()) {
            this.register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    public void run() {
                        AbstractUnsafe.this.register0(promise);
                    }
                });
            } catch (Throwable var4) {
                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                this.closeForcibly();
                AbstractChannel.this.closeFuture.setClosed();
                this.safeSetFailure(promise, var4);
            }
        }

    }
}

DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。

2.1、设置任务的成功或失败

DefaultPromise 核心源码如下:

java">public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    public Promise<V> setSuccess(V result) {
        if (this.setSuccess0(result)) {
            return this;
        } else {
            throw new IllegalStateException("complete already: " + this);
        }
    }

    public boolean trySuccess(V result) {
        return this.setSuccess0(result);
    }

    public Promise<V> setFailure(Throwable cause) {
        if (this.setFailure0(cause)) {
            return this;
        } else {
            throw new IllegalStateException("complete already: " + this, cause);
        }
    }

    public boolean tryFailure(Throwable cause) {
        return this.setFailure0(cause);
    }

    public boolean setUncancellable() {
        if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {
            return true;
        } else {
            Object result = this.result;
            return !isDone0(result) || !isCancelled0(result);
        }
    }

    public boolean isSuccess() {
        Object result = this.result;
        return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);
    }

    public boolean isCancellable() {
        return this.result == null;
    }

    //...

}

2.2、获取 Future 任务执行结果和添加监听事件

DefaultPromise 的get方法有 3 个。

无参数的get会阻塞等待;
有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。getNow()方法则会立马返回结果。

源码如下:

java">public V getNow() {
    Object result = this.result;
    return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}

public V get() throws InterruptedException, ExecutionException {
    Object result = this.result;
    if (!isDone0(result)) {
        this.await();
        result = this.result;
    }

    if (result != SUCCESS && result != UNCANCELLABLE) {
        Throwable cause = this.cause0(result);
        if (cause == null) {
            return result;
        } else if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        } else {
            throw new ExecutionException(cause);
        }
    } else {
        return null;
    }
}

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    Object result = this.result;
    if (!isDone0(result)) {
        if (!this.await(timeout, unit)) {
            throw new TimeoutException();
        }

        result = this.result;
    }

    if (result != SUCCESS && result != UNCANCELLABLE) {
        Throwable cause = this.cause0(result);
        if (cause == null) {
            return result;
        } else if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        } else {
            throw new ExecutionException(cause);
        }
    } else {
        return null;
    }
}

await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:

java">public Promise<V> await() throws InterruptedException { 
    if (this.isDone()) {
        return this;
    } else if (Thread.interrupted()) {
        throw new InterruptedException(this.toString());
    } else {
        this.checkDeadLock();
        synchronized(this) {
            while(!this.isDone()) {
                this.incWaiters();

                try {
                    this.wait();
                } finally {
                    this.decWaiters();
                }
            }

            return this;
        }
    }
    
    //...
}

addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。

java">public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    ObjectUtil.checkNotNull(listener, "listener");
    synchronized(this) {
        this.addListener0(listener);
    }

    if (this.isDone()) {
        this.notifyListeners();
    }

    return this;
}   

public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    ObjectUtil.checkNotNull(listeners, "listeners");
    synchronized(this) {
        GenericFutureListener[] var3 = listeners;
        int var4 = listeners.length;
        int var5 = 0;

        while(var5 < var4) {
            GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
            if (listener != null) {
                this.addListener0(listener);
                ++var5;
                continue;
            }
        }
    }

    if (this.isDone()) {
        this.notifyListeners();
    }

    return this;
}

public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
    ObjectUtil.checkNotNull(listener, "listener");
    synchronized(this) {
        this.removeListener0(listener);
        return this;
    }
}

public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    ObjectUtil.checkNotNull(listeners, "listeners");
    synchronized(this) {
        GenericFutureListener[] var3 = listeners;
        int var4 = listeners.length;

        for(int var5 = 0; var5 < var4; ++var5) {
            GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
            if (listener == null) {
                break;
            }

            this.removeListener0(listener);
        }

        return this;
    }
}

在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。

三、Netty 的 DefaultChannelPromise

DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:

java">public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    private long checkpoint;

	//...

    public Channel channel() {
        return this.channel;
    }

    public ChannelPromise setSuccess() {
        return this.setSuccess((Void)null);
    }

    public ChannelPromise setSuccess(Void result) {
        super.setSuccess(result);
        return this;
    }

    public boolean trySuccess() {
        return this.trySuccess((Object)null);
    }

    public ChannelPromise setFailure(Throwable cause) {
        super.setFailure(cause);
        return this;
    }

    //...

    public ChannelPromise promise() {
        return this;
    }

    protected void checkDeadLock() {
        if (this.channel().isRegistered()) {
            super.checkDeadLock();
        }

    }

    public ChannelPromise unvoid() {
        return this;
    }

    public boolean isVoid() {
        return false;
    }
}

总结

以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。


http://www.niftyadmin.cn/n/367903.html

相关文章

【MySQL学习1:单表】

之前做的笔记都在有道云&#xff0c;之后会一点点将以前的笔记分享出来~ MySQL学习1&#xff1a;单表查询 1. 变量起别名2. 去重3. 空值NULL4. 使用着重号5. 查询常数6. 显示表结果信息 describe 或 desc7. 运算符&#xff08;1&#xff09;安全等于运算符 <>&#xff08…

Django后台和微信小程序之间使用session方法,出现小程序访问404,Django后台找不到指定的URL问题解决

问题描述&#xff1a; 在Django后台开启session中间件&#xff0c;小程序端请求Django后台的session属性&#xff0c;在Django中执行session会话操作&#xff0c;并响应应答给小程序&#xff0c;在小程序端执行cookie的缓存和读取操作。 在上述的功能完成后&#xff0c;小程序…

Android9.0 iptables用INetd实现app某个时间段禁止上网的功能实现

1.前言 在9.0的系统rom定制化开发中,在system中netd网络这块的产品需要中,会要求设置app某个时间段禁止上网的功能,liunx中iptables命令也是比较重要的,接下来就来在INetd这块实现app某个时间段禁止上网的的相关功能,就是在系统中只能允许某个app某个时间段禁止上网,就是…

List Set Map Queue Deque 之间的区别是什么?

List Set Map Queue Deque 之间的区别是什么&#xff1f; 1. Java 集合框架有那些接口&#xff1f;2. List Set Map Queue Deque 之间的区别是什么&#xff1f; 1. Java 集合框架有那些接口&#xff1f; List、Set、Map、Queue、Deque 2. List Set Map Queue Deque 之间的区别…

Linux—基础篇:目录结构

1、基本介绍 1、linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构的最上层是根目录“/”,然后在此目录下创建其他目录 2、在Linux的世界里&#xff0c;一切皆文件&#xff01;&#xff01;&#xff01; 2、具体的目录结构 不用背&#xff0c;知道即可 2.1、…

Golang每日一练(leetDay0079) 最大正方形、完全二叉树节点数

目录 221. 最大正方形 Maximal Square &#x1f31f;&#x1f31f; 222. 完全二叉树的节点个数 Count Complete Tree Nodes &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/…

C#,码海拾贝(23)——求解“复系数线性方程组“的“全选主元高斯消去法“之C#源代码,《C#数值计算算法编程》源代码升级改进版

using System; namespace Zhou.CSharp.Algorithm { /// <summary> /// 求解线性方程组的类 LEquations /// 原作 周长发 /// 改编 深度混淆 /// </summary> public static partial class LEquations { /// <summary&g…

ubuntu安装搜狗输入法,图文详解+踩坑解决

搜狗输入法已支持Ubuntu16.04、18.04、19.10、20.04、20.10&#xff0c;本教程系统是基于ubuntu18.04 一、添加中文语言支持 系统设置—>区域和语言—>管理已安装的语言—>在“语言”tab下—>点击“添加或删除语言”。 弹出“已安装语言”窗口&#xff0c;勾选中文…