本文是我在学习Vert.x过程中的一些笔记,作为记录。
因为是初学,对Vert.x的理解还不够透彻,如有错误之处我们可以在评论中一起讨论呦。
概述
The event bus is the nervous system of Vert.x
EnventBus是Vert.x的神经系统,EventBus为Verticle之间提供通讯和信息传递的基础。这种方式提供了一个简单但有效的解耦。
如上图,EventBus为多个Verticle实例传递消息,而在Vertx中每一个Verticle都对应着一个或者多个处理器(handler),我们将部署两个Verticle(H1和H2)来处理HTTP请求,一个Verticle(D1)封装数据库持久化。由此产生的Verticle将没有相互的直接引用,它们将只商定事件总线中的目的地名称以及消息格式。假设H1接收到查询请求,H1会将查询的消息发送到EventBus上,此时注册在该地址上的D1接收到了消息,执行查询任务将返回结果以JSON形式原路返回,这整个过程都是异步进行。
发送到事件总线的消息将解码为JSON。虽然Vert.x的事件总线支持灵活的串行化方案用于高要求或者高度定制的上下文,但是使用JSON数据通常是明智的选择。使用JSON的另一个优势是它是一种语言无关的格式。由于Vert.x是支持多语言的,对于使用不同语言编写的Verticle之间的通讯,JSON是非常理想的。
地址
Verticle之间的消息被EventBus发送到一个约定的地址(Address),消息的提供者和消费者通过地址来实现消息的生产和消费。
处理器
这里的消费者自然就是不同的处理器Handler,处理器之间可以根据不通的消息通信方式实现不同的功能。
EventBus事件总线中支持的消息通信方式有如下三种:、
- 点对点 : 消息指发送给一个监听这个地址上的 消费者(consumer) 。
- 发布/订阅 : 消息会被所有监听在这个地址上的所有 消费者(consumer) 收到。
- 请求/应答 : 消息回发送给一个 消费者(consumer) , 它 应答 这个消息并且把另外一个 消息 发送回初始的发送者。
消息类型
通常的消息格式无非字符串、整数、Json等,但因为Vert.x多语言的特点,JSON则是他最常用的消息类型,JSON在Vertx支持的所有语言都是非常容易创建、读取和解析的,因此它已经成为了Vert.x中的通用语。这就给程序员提供很大的发挥空间,你可以自定义一个专属的消息传递对象,通过JSON形式进行传递。
EventBus的使用
1.获取EventBus对象
1
| EventBus eventBus = vertx.eventBus();
|
2.EventBus对外提供的api
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Fluent EventBus send(String var1, Object var2);
@Fluent <T> EventBus send(String var1, Object var2, Handler<AsyncResult<Message<T>>> var3);
@Fluent EventBus send(String var1, Object var2, DeliveryOptions var3);
@Fluent <T> EventBus send(String var1, Object var2, DeliveryOptions var3, Handler<AsyncResult<Message<T>>> var4);
@Fluent EventBus publish(String var1, Object var2);
@Fluent EventBus publish(String var1, Object var2, DeliveryOptions var3);
<T> MessageConsumer<T> consumer(String var1);
<T> MessageConsumer<T> consumer(String var1, Handler<Message<T>> var2);
<T> MessageConsumer<T> localConsumer(String var1);
<T> MessageConsumer<T> localConsumer(String var1, Handler<Message<T>> var2);
<T> MessageProducer<T> sender(String var1);
<T> MessageProducer<T> sender(String var1, DeliveryOptions var2);
<T> MessageProducer<T> publisher(String var1);
<T> MessageProducer<T> publisher(String var1, DeliveryOptions var2);
@GenIgnore({"permitted-type"}) EventBus registerCodec(MessageCodec var1);
@GenIgnore({"permitted-type"}) EventBus unregisterCodec(String var1);
@GenIgnore <T> EventBus registerDefaultCodec(Class<T> var1, MessageCodec<T, ?> var2);
@GenIgnore EventBus unregisterDefaultCodec(Class var1);
@GenIgnore void start(Handler<AsyncResult<Void>> var1);
@GenIgnore void close(Handler<AsyncResult<Void>> var1);
@Fluent <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> var1);
@Fluent <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> var1);
@Fluent <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> var1);
@Fluent <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> var1);
|
可以看到提供了很多的接口,我们大致对他们的功能进行分类如下:
发布消息publish
发布消息到指定地址可以使用publish
方法
1 2 3 4
| @Fluent EventBus publish(String var1, Object var2); @Fluent EventBus publish(String var1, Object var2, DeliveryOptions var3);
|
eventBus.publish("hello.world", "发布一条消息....");
通过publish发布的消息将会传递给所有在地址 hello.world 上注册过的处理器。这就涉及到EventBus的发布订阅
在发布的时候我们还有一个参数DeliveryOptions
,其实可以理解为获取Vertx对象时的Vertxoptions
一样,可以额外添加一些配置,进行消息的发布。
发送消息send
发送消息到指定地址可以使用send
方法,他与发布消息最大的区别就是,send方法只会发送消息到指定地址上的一个处理器,正好对应点对点的信息通信方式
1 2 3 4 5 6 7 8
| @Fluent EventBus send(String var1, Object var2); @Fluent <T> EventBus send(String var1, Object var2, Handler<AsyncResult<Message<T>>> var3); @Fluent EventBus send(String var1, Object var2, DeliveryOptions var3); @Fluent <T> EventBus send(String var1, Object var2, DeliveryOptions var3, Handler<AsyncResult<Message<T>>> var4);
|
eventBus.send("hello.world", "发送一条消息....");
除了单纯的发送消息外,EventBus还提供了用户自定义属性配置(DeliveryOptions),进行消息发送的接口
1 2 3
| DeliveryOptions options = new DeliveryOptions(); options.addHeader("token", "aaaaaa"); eventBus.send("hello.world", "发送一条消息....", options);
|
当我们发送一条消息到某一地址后,消息被某一处理器接收并处理,这时我们需要知道消息是何时被消费的,我们可以通过send方法的另一种方式配合consumer方法来实现请求应答的消息通信方式。
1 2 3 4 5 6 7 8 9 10 11
| eventBus.send("hello.world", "发送一条消息....", ar -> { if (ar.succeeded()) { System.out.println("结果: " + ar.result().body()); } });
MessageConsumer<String> consumer = eventBus.consumer("hello.world"); consumer.handler(message -> { System.out.println("处理器收到一条消息: " + message.body()); message.reply("消费成功!"); });
|
注册处理器consumer
1 2
| <T> MessageConsumer<T> consumer(String var1); <T> MessageConsumer<T> consumer(String var1, Handler<Message<T>> var2);
|
consumer方法可以将处理器注册到指定的地址,第一个参数String var1
就是address,EventBus提供了两个注册处理器的方法,一种是直接在参数中指定,另一种是通过comsumer方法返回的MessageConsumer
对象进行注册设置。
1 2 3 4 5 6 7 8 9 10
| EventBus eb = vertx.eventBus();
eb.consumer("hello.world", message -> { System.out.println("处理器收到一条消息: " + message.body()); });
MessageConsumer<String> msgcomsumer = eb.consumer("hello.world"); msgcomsumer.handler(message -> { System.out.println("处理器收到一条消息: " + message.body()); });
|
如果你想知道他什么时候注册成功,那么可以为msgcomsumer绑定一个完成时的处理器
1 2 3 4 5 6 7
| msgcomsumer.completionHandler(res -> { if (res.succeeded()) { System.out.println("处理器注册成功"); } else { System.out.println("处理器注册失败"); } });
|