Java的http服务端响应式编程

撰写于 2018-05-11 修改于 2018-12-16 标签 原创

为什么要响应式编程?

传统的Servlet模型走到了尽头

传统的Java服务器编程遵循的是J2EE的Servlet规范,是一种基于线程的模型:每一次http请求都由一个线程来处理。

客户端客户端服务器服务器发送请求解码处理请求编码返回结果

线程模型的缺陷在于,每一条线程都要自行处理套接字的读写操作。对于大部分请求来讲,本地处理请求的速度很快,请求的读取和返回是最耗时间的。也就是说大量的线程浪费在了远程连接上,而没有发挥出计算能力。但是需要注意一点,线程的创建是有开销的,每一条线程都需要独立的内存资源。JVM里的-Xss参数就是用来调整线程堆栈大小的。而JVM堆的总大小局限在了-Xmx参数上,因此一个正在运行的JVM服务器能够同时运行的线程数是固定的。

即便通过调整JVM参数,使其能够运行更多线程。但是JVM的线程会映射成为操作系统的用户线程,而操作系统依然只能调度有限数量的线程。例如,Linux系统可以参考这里的讨论:Maximum number of threads per process in Linux?

此外,大量线程在切换的时候,也会产生上下文加载卸载的开销,同样会降低系统的性能。

可伸缩 IO

Doug Lea大神有一篇很经典的PPTScalable IO in Java,讲述了一个更为优秀的服务器模型。

一个可伸缩的网络服务系统应当满足以下条件:

  1. 能够随着计算资源(CPU、内存、磁盘容量、网络带宽等)的增加提高负载能力。
  2. 当网络负载增加超过能力的时候,能够优雅降级,避免直接崩溃。例如,拒绝为超过能力范围的请求提供服务,但对于能力范围内的请求,依然提供服务。当流量洪峰过去之后,依然能够正常运行。
  3. 当然高可用、高性能依然是必须的:例如低响应延迟、随负载变化请求或释放计算资源等。

作者给出的解决方案就是Reactor模式。

Reactor模式将耗时的IO资源封装为handle对象。handle对象注册在操作系统的内核里,当对象满足一定的条件时(可读或者可写),才会处理handle对象。在Reactor模式中,同步多路复用器负责处理handle对象的状态变更,当满足条件时,会调用handle对象注册时提供的回调函数。

同步多路复用器在一个单独的线程里专门处理IO链接。当请求读取完毕之后,任务提交至工作线程池完成请求的解码、处理、编码等工作,最后将由多路复用器负责将结果返回给客户端,而池内线程继续处理下一个任务。相比JDK1.5之前的对每一次请求新建一个线程的方式,线程池能够实现线程复用,降低创建回收线程的开销,在应对密集计算负载的时候有更好的表现。同时,在多个线程上分别部署一个同步多路复用器,也可以更好地利用多核CPU的处理能力。

请求多路复用器组线程池响应多路复用器组请求多路复用器提交请求拒绝服务解码1处理1编码1解码2处理2编码2响应多路复用器关闭连接socket读取完成falsetrue线程池可用truefalsesocket写入完成falsetrue

这样,线程的任务分工就很明确,分别专门处理IO密集任务和专门处理CPU密集任务。

NIO普及艰难

从最早的select到后来Linux的epoll和BSD的Kqueue,操作系统的多路复用性能一直在不断增强。

JDK 1.4引入了NIO模块,屏蔽了操作系统层面的细节,将各个系统的多路复用API做了统一封装。JDK的NIO有以下几个核心组件:

  • Buffer,一种容量在创建时被固定的数据容器
  • Charset,负责数据的编解码工作
  • Channel,对远程连接的抽象
  • Selector,多路复用选择器

网络连接封装在Channels对象里面。Channels在Selector上注册感兴趣的SelectionKey事件:可读OP_READ、可写OP_WRITE、可连接OP_CONNECT还有服务器端套接字才有的可接入OP_ACCEPT。多路复用选择器调用阻塞式select方法的时,在等待某一事件可用,然后就通知Channels执行相应的handler。Buffer是Channels实现读写操作的缓冲区。Charset用于对Buffer的内容进行编解码。在NIO模式下,Selector能够管理多个套接字的网络读写,避免了过多计算线程阻塞在读写请求上。

在JVM之外的世界里,多路复用通过Nginx、基于V8引擎的Node.js早就大放异彩。但是Java NIO在生产环境里的发展却很慢。例如,Tomcat直到2016年发布8.5版本的时候,才彻底移除BIO连接器,完全拥抱NIO。

JDK NIO主要有这样几个问题比较麻烦:

  1. 首先是NIO为了提高数据收发性能,可以创建DirectBuffer对象。该对象的内存开辟在JVM堆之外,无法通过正常的GC收集器来回收,只能在JVM的老年代触发全量GC的时候回收。而全量GC往往导致系统卡顿,降低响应效率。如果被动等待老年代区域自行触发全量GC,又有可能造成堆外内存溢出。两者之间的矛盾需要在开发的时候小心的平衡。
  2. 其次就是,JDK1.8依然存在的epoll bug:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%。

Netty才是NIO该有的水准

作为一个第三方框架,Netty做到了JDK本应做到的事情。

Netty的数据容器ByteBuf更为优秀

ByteBuf同时维护两个索引:读索引和写索引。从而保证容器对象能够同时适配读写同时进行的场景。而NIO的Buffer却需要执行一次flip操作来适应读写场景的切换。同时ByteBuf容器使用引用计数来手工管理,可以在引用计数归零时通过反射调用jdk.internal.ref.Cleaner来回收内存,避免泄露。在GC低效的时候,选择使用手工方式来管理内存,完全没问题。

Netty的API封装度更高

观察一下Netty官网Tutorial给出的demo,只要几十行代码就完成了一个具备Reactor模式的服务器。ServerBootstrap的group方法定义了主套接字和子套接字的处理方式,例中使用的NioEventLoopGroup类为Java NIO + 多线程的实现方式。对于NIO的epoll bug,NioEventLoopGroup的解决方案是rebuildSelectors对象方法。这个方法允许在selector失效时重建新的selector,将旧的释放掉。此外,Netty还通过JNI实现了自己的EpollEventLoopGroup,规避了NIO版本的bug。

Netty使用责任链模式实现了对server进出站消息的处理,使得server的代码能够更好的扩展和维护。

Netty在生产领域得到大量应用,Hadoop Avro、Dubbo、RocketMQ、Undertow等广泛应用于生产领域的产品的通信组件都选择了Netty作为基础,并经受住了考验。

Netty是一个优秀的异步通信框架,但是主要应用在基础组件中。因为Netty向开发者暴露出大量的细节,对于业务系统的开发仍然形成了困扰,所以没法得到进一步的普及。

举个例子。Netty使用ChannelFuture来接收传入的请求。相比于JDK的Future实现,ChannelFuture可以添加一组GenericFutureListener来管理对象状态,避免了反复对Future对象状态的询问或阻塞获取。这是个进步。但是,这些Listener都带来了另一个问题——Callback hell。而嵌套的回调代码往往难以维护。

对于Callback hell,我们可以做什么

Netty做一个优秀的基础组件就很好了。业务层面的问题就让我们用业务层面的API来解决。

Java API的适应性不佳

JDK7以前的异步代码难以组织

在JDK7以及之前,Java多线程的编程工具主要就是Thread、ExecutorService、Future以及相关的同步工具,实现出来的代码较为繁琐、且性能不高。

Thread

举个例子A,考虑一个场景有X、P、Q三个逻辑需要执行,其中X的执行需要在P、Q一起完成之后才启动执行。

如果使用Thread,那么代码会是这个样子:

/* 创建线程 */
Thread a = new Thread(new Runnable() {
    @Override
    public void run() {
        /* P逻辑 */
    }
});

Thread b = new Thread(new Runnable() {
    @Override
    public void run() {
        /* Q逻辑 */
    }
});

/* 启动线程 */
a.start();
b.start();

/* 等候a、b线程执行结束 */
try {
    a.join();
    b.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

/* 启动X逻辑的执行 */
Thread c = new Thread(new Runnable() {
    @Override
    public void run() {
        /* X逻辑 */
    }
});
c.start();

...

上面这个代码,先不论线程创建的开销,单从形式上看,线程内部的执行逻辑、线程本身的调度逻辑,还有必须捕获的InterruptedException的异常处理逻辑混杂在一起,整体很混乱。假想一下,当业务逻辑填充在其中的时候,代码更难维护。

ThreadPoolExecutor、Future

ThreadPoolExecutor和Future有助于实现线程复用,但对于代码逻辑的规范没什么帮助。

ExecutorService pool = Executors.newCachedThreadPool();
Future<?> a = pool.submit(new Runnable() {
    @Override
    public void run() {
        /* P逻辑 */
    }
});
Future<?> b = pool.submit(new Runnable() {
    @Override
    public void run() {
        /* Q逻辑 */
    }
});

/* 获取线程执行结果
 * 依然要捕获异常,处理逻辑
 */
try {
    a.get();
    b.get();
    Future<?> c = pool.submit(new Runnable() {
        @Override
        public void run() {
            /* X逻辑 */
        }
    });
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

JDK8代码可读性有了显著提高

JDK8借鉴了相当多的函数式编程的特点,提供了几样很称手的工具。

CompleteableFuture和ForkJoinPool

如果要用CompleteableFuture实现上一个例子,可以这样写。

CompletableFuture<?> a = CompletableFuture.runAsync(() -> {
    /* P逻辑 */
}).exceptionally(ex -> {
    /* 异常处理逻辑 */
    return ...;
});
CompletableFuture<?> b = CompletableFuture.runAsync(() -> {
    /* Q逻辑 */
});
CompletableFuture<?> c = CompletableFuture.allOf(a, b).thenRun(() -> {
    /* X逻辑 */
});

有了lambda表达式的加持,例中的代码整体以线程内部逻辑为主,调度逻辑通过allOf()、thenRun()等方法名直观地展示出来。特别是可选的异常捕获逻辑,更是使得代码可读性得到了极大的提高。

需要注意的是,CompletableFuture是可以使用指定ExecutorService来执行的。如果像上例那样没有指定ExecutorService对象,那么会默认使用ForkJoinPool里的静态对象commonPool来执行。而ForkJoinPool.commonPool作为一个JVM实例中唯一的对象,也是Stream并发流的执行器,因此应当尽量保证CompletableFuture里的逻辑不会阻塞线程。如果无法规避,可以使用ManagedBlocker来降低影响。

ForkJoinPool是JDK1.7提供的并发线程池,可以很好地应对计算密集型并发任务,特别适用于可以“分-治”的任务。传统的ThreadPoolExecutor需要指定线程池里的线程数量,而ForkJoinPool使用了一个相似但更有弹性的概念——“并发度”。并发度指的是池内的活跃线程数。对于可能的阻塞任务,ForkJoinPool设计了一个ManagedBlocker接口。当池内线程执行到ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker blocker)方法时,线程池会新建一个线程去执行队列里的其他任务,并轮询该对象的isReleasable方法,决定是否恢复线程继续运行。JDK1.7里的Phaser类源码用到了这个方法。

关于CompleteableFuture的用法,推荐看看这篇博客:理解CompletableFuture,总结的很好。
而对于ForkJoinPool,可以看看这篇博客:Java 并发编程笔记:如何使用 ForkJoinPool 以及原理

Stream

Stream流也是JDK8引入的一个很好的编程工具。

Stream对象通常通过Iterator、Collection来构造。也可以用StreamSupport的stream静态方法来创建自定义行为的实例。

Stream流对象采用链式编程风格,可以制定一系列对流的定制行为,例如过滤、排序、转化、迭代,最后产生结果。看个例子。

List<Integer> intList = List.of(1, 2, 3);

List<String> strList = intList.stream()
        .filter(k -> k>1)
        .map(String::valueOf)
        .collect(Collectors.toList());

上面这段代码中,intList通过stream方法获取到流对象,然后筛选出大于1的元素,并通过String的valueOf静态方法生成String对象,最后将各个String对象收集为一个列表strList。就像CompletableFuture的方法名一样,Stream的方法名都是自描述的,使得代码可读性极佳。

除此之外,Stream流的计算还是惰性的。Stream流对象的方法大致分为两种:

  • 中间方法,例如filter、map等对流的改变
  • 终结方法,例如collect、forEach等可以结束流

只有在执行终结方法的时候,流的计算才会真正执行。之前的中间方法,都作为步骤记录下来,但没有实时地执行修改操作。

如果将例子里的stream方法修改为parallelStream,那么得到的流对象就是一个并发流,而且总在ForkJoinPool.commonPool中执行。

关于Stream,极力推荐Brian Goetz大神的系列文章Java Streams

还有一点问题

ForkJoinPool是一款强大的线程池组件,只要使用的得当,线程池总会保持一个合理的并发度,充分利用计算资源。

但是,CompleteableFuture也好,Stream也好,他们都存在一个相同的问题:无法通过后端线程池的负载变化,来调整前端的调用压力。打比方说,当后端的ForkJoinPool.commonPool在全力运算而且队列里有大量的任务排队时,新提交的任务很可能会有很高的响应延迟,但是前端的CompleteableFuture或者Stream没有途径去获取这样一个状态,来延缓任务的提交。这种情况就违背了“响应式系统”的“灵敏性”要求。

来自第三方API的福音

Reactive Streams

Reactive Streams是一套标准,定义了一个运行于JVM平台上的响应式编程框架实现所应该具备的行
为。

Reactive Streams规范衍生自“观察者模式”,将前后依赖的逻辑流,拆解为事件和订阅者。只有当事件发生变更时,感兴趣的观察者才随之执行随后的逻辑。Reactive Stream和JDK的Stream的理念有点接近,两者都是注重对数据流的控制。紧耦合的逻辑流拆分为“订阅-发布”方式其实是一大进步。代码变得维护性更强,而且很容易随着业务的需要按照消息驱动模式拆解。

Reactive Streams规范定义了四种接口:

  • Publisher,负责生产数据流,每一个订阅者都会调用subscribe方法来订阅消息。
  • Subscriber,就是订阅者。
  • Subscription,其实就是一个订单选项,相当于饭馆里的菜单,由发布者传递给订阅者。
  • Processor,处于数据流的中间位置,即是订阅者,也是新数据流的生产者。

当Subscriber调用Publisher.subscribe方法订阅消息时,Publisher就会调用Subscriber的onSubscribe方法,回传一个Subscription菜单。

Subscription菜单包含两个选择:

  1. 一个是request方法,对数据流的请求,参数为所请求的数据流的数量,最大为Long.MAX_VALUE;
  2. 另一个是cancel方法,对数据流订阅的取消,需要注意的是数据流或许会继续发送一段时间,以满足之前的请求调用。

一个Subscription对象只能由同一个Subscriber调用,所以不存在对象共享的问题。因此即便Subscription对象有状态,也不会危及逻辑链路的线程安全。

订阅者Subscriber还需要定义三种行为:

  1. onNext,接受到数据流之后的执行逻辑;
  2. onError,当发布出现错误的时候如何应对;
  3. onComplete,当订阅的数据流发送完毕之后的行为。

相比于Future、Thread那样将业务逻辑和异常处理逻辑混杂在一起,Subscriber将其分别定义在三个方法里,代码显得更为清晰。java.util.Observer(在JDK9中开始废弃)只定义了update方法,相当于这里的onNext方法,相比之下Subscriber增加了对流整体的管理和对异常的处理。异常如果随着调用链传递出去,调试定位会非常麻烦。因此要重视onError方法,尽可能在订阅者内部就处理这个异常。

尽管Reactive Streams规范和Stream都关注数据流,但两者有一个显著的区别。那就是Stream是基于生产一方的,生产者有多大能力,Stream就制造多少数据流。而Reactive Streams规范是基于消费者的。逻辑链下游可以通过对request方法参数的变更,通知上游调整生产数据流的速度。从而实现了“响应式系统”的“灵敏性”要求。这在响应式编程中,用术语“背压”来描述。

Reactive Streams规范仅仅是一个标准,其实现又依赖其他组织的成果。其意义在于各家实现能够通过这样一个统一的接口,相互调用,有助于响应式框架生态的良性发展。Reactive Streams规范虽然是Netflix、Pivatol、RedHat等第三方大厂合作推出的,但已经随着JDK9的发布收编为官方API,位于java.util.concurrent.Flow之内。JDK8也可以在项目中直接集成相应的模块调用。

顺便吐槽一下,JDK9官方文档给出的demo里的数据流居然从Subscription里生产出来,吓得我反复确认了一下Reactive Streams官方规范。

RxJava2

RxJava由Netfilx维护,实现了ReactiveX API规范。该规范有很多语言实现,生态很丰富。

Rx范式最先是微软在.NET平台上实现的。2014年11月,Netfilx将Rx移植到JVM平台,发布了1.0稳定版本。而Reactive Streams规范是在2015年首次发布,2017年才形成稳定版本。所以RxJava 1.x和Reactive Streams规范有很大出入。1.x版本迭代至2018年3月的1.3.8版本时,宣布停止维护。

Netflix在2016年11月发布2.0.1稳定版本,实现了和Reactive Streams规范的兼容。2.x如今是官方的推荐版本。

RxJava框架里主要有这些概念:

  • Observable与Observer。RxJava直接复用了“观察者模式”里的概念,有助于更快地被开发社区接受。Observeble和Publisher有一点差异:前者有“冷热”的区分,“冷”表示只有订阅的时候才发布消息流,“热”表示消息流的发布与时候有对象订阅无关。Publisher更像是“冷”的Observeble。
  • Operators,也就是操作符。RxJava和JDK Stream类似,但设计了更多的自描述的函数方法,并同样实现了链式编程。这些方法包括但不限于转换、过滤、结合等等。
  • Single,是一种特殊的Observable。一般的Observable能够产生数据流,而Single只能产生一个数据。所以Single不需要onNext、onComplete方法,而是用一个onSuccess取而代之。
  • Subject,注意这个不是事件,而是介于Observable与Observer之间的中介对象,类似于Reactive Streams规范里的Processor。
  • Scheduler,是一类线程池,用于处理并发任务。RxJava默认执行在主线程上,可以通过observeOn/subscribeOn方法来异步调用阻塞式任务。

RxJava 2.x在Zuul 2、Hystrix、Jersey等项目都有使用,在生产领域已经得到了应用。

Reactor3

Reactor3有Pivotal来开发维护,也就是Spring的同门师弟。

整体上,Reactor3框架里的概念和RxJava都是类似的。Mono和Flux都等同于RxJava的Single和Observable。Reactor3也使用自描述的操作符函数实现链式编程。

RxJava 2.x支持JVM 6+平台,对老旧项目很友好;而Reactor3要求必须是JVM8+。所以说,如果是新项目,使用Reactor3更好,因为它使用了很多新的API,支撑很多函数式接口,代码可读性维护性都更好。

背靠Spring大树,Reactor3的设计目标是服务器端的Java项目。Reactor社区针对服务器端,不断推出新产品,例如Reactor Netty、Reactor Kafka等等。但如果是Android项目,RxJava2更为合适(来自Reactor3官方文档的建议)。

老实讲,Reactor3的文档内容更丰富。

什么是响应式系统

响应式宣言里面说的很清楚,一个响应式系统应当是:

  • 灵敏的:能够及时响应
  • 有回复性的:即使遇到故障,也能够自行恢复、并产生回复
  • 可伸缩的:能够随着工作负载的变化,自行调用或释放计算资源;也能够随着计算资源的变化,相应的调整工作负载能力
  • 消息驱动的:显式的消息传递能够实现系统各组件解耦,各类组件自行管理资源调度。

构建响应式Web系统

Vert.X

Vert.X目前由Eclipse基金会维护,打造了一整套响应式Web系统开发环境,包括数据库管理、消息队列、微服务、权限认证、集群管理器、Devops等等,生态很丰富。

Vert.X Core框架基于Netty开发,是一种事件驱动框架:每当事件可行时都会调用其对应的handler。在Vert.X里,有专门的线程负责调用handler,被称作eventloop。每一个Vert.X实例都维护了多个eventloop。

Vert.X Core框架有两个重要的概念:Verticle和Event Bus。

Verticle

Verticle类似于Actor模型的Actor角色。

Actor是什么?

这里泛泛的说一下吧。

Actor模型主要针对于分布式计算系统。Actor是其中最基本的计算单元。每一个Actor都有一个私有的消息队列。Actor之间的通信依靠发送消息。每一个Actor都可以并发地做到:

  1. 向其他Actor发送消息
  2. 创建新的Actor
  3. 指定当接收到下一个消息时的行为

Verticle之间的消息传递依赖于下面要说的Event Bus。

Vert.X为Verticle的部署提供了高可用特性:在Vert.X集群中,如果一个节点的上运行的Veticle实例失效,其他节点就会重新部署一份新的Verticle实例。

Verticle只是Vert.X提供的一种方案,并非强制使用。

Event Bus

Event Bus是Vert.X框架的中枢系统,能够实现系统中各组件的消息传递和handler的注册与注销。其消息传递既支持“订阅-发布”模式,也支持“请求-响应”模式。

当多个Vert.X实例组成集群的时候,各系统的Event Bus能够组成一个统一的分布式Event Bus。各Event Bus节点相互之间通过TCP协议通信,没有依赖Cluster Manager。这是一种可以实现节点发现,提供了分布式基础组件(锁、计数器、map)等的组件。

Spring WebFlux

Spring5的亮点之一就是响应式框架Spring WebFlux,使用自家的Reactor3开发,但同样支持RxJava。

Spring WebFlux的默认服务端容器是Reactor Netty,也可以使用Undertow或者Tomcat、Jetty的实现了Servlet 3.1 非阻塞API接口的版本。Spring WebFlux分别为这些容器实现了与Reactive Streams规范实现框架交互的适配器(Adapter),没有向用户层暴露Servlet API。

Spring WebFlux的注解方式和Spring MVC很像。这有助于开发团队快速适应新框架。而且Spring WebFlux兼容Tomcat、Jetty,有助于项目运维工作的稳定性。

但如果是新的项目、新的团队,给我大概会选Vert.X,因为Event Bus确实很吸引人。

参考资料

延伸阅读

Site by Zhang,Xin using Hexo & Random

Hide