本文共 15929 字,大约阅读时间需要 53 分钟。
-Author:dongdong
-Email: ldyldy828@163.com
在通常情况下你在使用消息中间件的时候,都是未经设计的使用,你没有把应用架构和系统架构边界搞清楚。消息中间件只是一个纯粹的技术工具,当你引入的时候是站在应用架构的角度引入的。这是架构的角度,也是架构的上帝视角,这样你就不会用到最后发现越来越混乱,而且也无法结合软件模式、方法论、最佳实践来综合提升系统的架构能力。
EDA(Event Driven Architecture,EDA) 事件驱动架构,它是一种用来在SOA或者Micro service中进行的架构模式。它的好处有几个,柔性具有很高的伸缩性。
既然要EDA就要规划好你当前的系统边界之内有多少业务实体,这些实体是围绕着领域模型而得来。所以这里不要很主观的就定义一些你认为的事件,这些事件要根据业务实体中的对象来设计。业务实体起码是有唯一Identity的。比如,订单、商品,围绕着这些实体展开,订单可能有几个状态是比较常用的,创建、支付、配送、取消。商品可能有价格、关键属性修改等等。这些实体的抽象和提炼取决于你当前的业务。
(有关这方面内容可以参考:《领域驱动设计》、《探索CQRS和事件源》)
这些是相对理论的指导思想,有了这些之后你可以落地你的Rabbitmq,这样你就不会跑偏了。比如,你的消息名称不会是看起来没结构和层次的,deliveryMssage(配送消息)。而是应该,order.delivery.ondeliveryEvented(订单.配送.配送完成事件)这样的结构。
当你的层次结构不满足业务需求的时候,你可能还需要进一步明确事件范围,order.viporder.delivery.ondeliveryEvented(订单.VIP订单.配送.配送完成事件)。
![202205-20161210194527538-1135172120](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194527538-1135172120.png)
上图是一个事件驱动的基本场景,它最瞩目的几个特性就是这几个,首先是异步化的,可以大大提高系统的抗峰值能力。然后就是解耦,这不用说了,设计模式里的观察者模式没有人不知道它的好处。伸缩性,可以按需scaleout,比如rabbitmq的node可以很方便的加入。最终一致性解决了分布式系统的CAP定理的问题。
AMQP协议中约定了routing key的设计和交互。为了实现订阅发布功能,我们需要某种方式能够订阅自己所感兴趣的事件。所以在AMQP中的Binding中,可以根据routing key来进行模式匹配。所以,这里可以结合amqp routingkey与领域事件,发出来的事件就相当于amqp中的routingkey,这样可以完美的结合起来。
你的事件肯定是随着业务发展逐渐增加的,而这个事件集合也没办法在一开始就定义清楚,所以这里有一个需要注意的就是,绑定的时候千万不要写死具体的routing key。比如,order.delivery.OnDeliveryEvented,这是订单配送,此时你Binding的时候routingkey就写成了”order.delivery.OnDeliveryEvented”。未来订单事件一扩展,就会很麻烦,不相关的事件都被订阅到,无法细化或者事件你无法获取到,因为routingkey改变了。所以在绑定的时候记住具体点绑定,也就是借助字符串的模式匹配绑定,比如,.delivery.,*.onDeliveryEvented”这样。将来越来越多的routingkey和event出来都不会影响你的绑定。你只需要根据自己的关心程度,绑定在事件的不同层级上即可。
![202205-20161210194528319-754960138](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194528319-754960138.png)
上图中,orderBinding绑定了order事件,它订阅了顶级事件,也就是说未来任何类型的订单都可以被订阅到,比如,order.normalorder.delivery.onDeliveryEvent也可以被订阅到。而viporderBinding订阅了viporder事件,如果发送了一个order.normalorder.delivery.onDeliveryEvent就跟它没关系了。
搞清楚了应用架构的事情,我们开始着手搭建RabbitMQ cluster。rabbitmq这款AMQP产品是用erlang开发的,那么我们稍微介绍下erlang。
我第一次正式接触erlang就是从rabbitmq开始的,一开始并没有太多感觉到特别的地方,后来才明白越明白越发现挺喜欢这门语言的。喜欢的理由就是,它是天然的分布式语言。这句话说起来好像挺平常的,但是当你明白了.erlang.cookie机制之后才恍然大悟。瞬间顿悟了,为什么要用erlang来搞rabbitmq,而是它真的很适合信息交换之类的软件。erlang是爱立信公司开发的专门用来开发高性能信息交换机的,想想也会觉得那些软件的性能和稳定性要求是极高的。RabbitMQ的节点发现和互连真的很方便,这在erlang的虚拟机中就集成了,而且具有高度容错能力。反正我对它很有好感。
还有一点值得骄傲的是RabbitMQ是伟大的pivotal公司的,你应该知道pivotal公司是干什么的,如果你还不清楚建议你立刻google下。(RabbtiMQ 官网:<;)
要想安装RabbitMQ,首先需要安装和配置好它的宿主环境erlang。去erlang官网下载好erlang otp_src源码包,然后在本地执行源码安装。(erlang官网:<;)
1.安装依赖环境:
1)安装GCC GCC-C++ Openssl等模块,安装过就不需要安装了
[root@mq]# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
2)安装ncurses
[root@mq]# yum -y install ncurses-devel
2.下载erlang:
官网下载:
或者直接下载
[root@mq]# wget http://erlang.org/download/otp_src_21.2.tar.gz
[root@mq]# tar xzvf otp_src_21.2.tar.gz [root@mq]# cd otp_src_21.2
执行
[root@mq]# ./configure --prefix /usr/local/soft
进行环境的检查和安装路径的选择
[root@mq otp_src_21.2]# make && make install
查看安装位置:
[root@mq otp_src_21.2]# find / -name erlang
/root/otp_src_21.2/lib/jinterface/java_src/com/ericsson/otp/erlang/root/otp_src_21.2/bootstrap/lib/jinterface/priv/com/ericsson/otp/erlang/usr/local/soft/lib/erlang
配置环境变量:
在/etc/profile文件最后加上
[root@mq ~]# vim /etc/profile
export PATH=$PATH:/usr/local/soft/lib/erlang/bin
[root@mq ~]# source /etc/profile
查看是否安装成功:
[root@mq otp_src_21.2]# erl
Erlang/OTP 21 [erts-10.2][source] [64-bit][smp:1:1] [ds:1:1:10][async-threads:1] [hipe]Eshell V10.2 (abort with ^G)1>
接下来安装RabbitMQ
RabbitMQ官网提供了新版的rpm包(),但是安装的时候会提示需要erlang版本>=19.3,然而默认yum仓库中的版本较低。
其实RabbitMQ在github上有提供新的erlang包()也可以直接加到yum源中
[root@mq ~]# vim /etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang]name=rabbitmq-erlangbaseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7gpgcheck=1gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.ascrepo_gpgcheck=0enabled=1
[root@mq ~]# yum clean all
[root@mq ~]# yum makecache
然后下载RabbitMQ的RPM包()
这里是centos7的版本 [root@mq ~]# wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.4/rabbitmq-server-3.7.4-1.el7.noarch.rpm [root@mq ~]# yum -y install rabbitmq-server-3.7.4-1.el7.noarch.rpm yum会自动去源里安装依赖包
安装到这里就完成了,下面进行简单的配置
启动RabbitMQ服务[root@mq ~]# service rabbitmq-server startRedirecting to /bin/systemctl start rabbitmq-server.service 状态查看[root@mq ~]# rabbitmqctl statusStatus of node rabbit@mq2 ...[{pid,52995}, {running_applications, [{rabbit,"RabbitMQ","3.7.4"}, {mnesia,"MNESIA CXC 138 12","4.15.3.2"}, {rabbit_common, "Modules shared by rabbitmq-server and rabbitmq-erlang-client", "3.7.4"}, {ranch_proxy_protocol,"Ranch Proxy Protocol Transport","1.4.4"}, {ranch,"Socket acceptor pool for TCP protocols.","1.4.0"}, {ssl,"Erlang/OTP SSL application","8.2.6.4"}, {public_key,"Public key infrastructure","1.5.2"}, {crypto,"CRYPTO","4.2.2.2"}, {asn1,"The Erlang ASN1 compiler version 5.0.5.1","5.0.5.1"}, {xmerl,"XML parser","1.3.16"}, {inets,"INETS CXC 138 49","6.5.2.4"}, {os_mon,"CPO CXC 138 46","2.4.4"}, {jsx,"a streaming, evented json parsing toolkit","2.8.2"}, {recon,"Diagnostic tools for production use","2.3.2"}, {lager,"Erlang logging framework","3.5.1"}, {goldrush,"Erlang event stream processor","0.1.9"}, {compiler,"ERTS CXC 138 10","7.1.5.2"}, {syntax_tools,"Syntax tools","2.1.4.1"}, {sasl,"SASL CXC 138 11","3.1.2"}, {stdlib,"ERTS CXC 138 10","3.4.5.1"}, {kernel,"ERTS CXC 138 10","5.4.3.2"}]}, {os,{unix,linux}}, {erlang_version, "Erlang/OTP 20 [erts-9.3.3.6] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory, [{connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,0}, {queue_procs,0}, {queue_slave_procs,0}, {plugins,5864}, {other_proc,19366464}, {metrics,184432}, {mgmt_db,0}, {mnesia,72816}, {other_ets,1864576}, {binary,56152}, {msg_index,28528}, {code,24980384}, {atom,1041593}, {other_system,9156871}, {allocated_unused,12563024}, {reserved_unallocated,667648}, {strategy,rss}, {total,[{erlang,56757680},{rss,69988352},{allocated,69320704}]}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_calculation_strategy,rss}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,408762777}, {disk_free_limit,50000000}, {disk_free,14542917632}, {file_descriptors, [{total_limit,924},{total_used,2},{sockets_limit,829},{sockets_used,0}]}, {processes,[{limit,1048576},{used,199}]}, {run_queue,0}, {uptime,41}, {kernel,{net_ticktime,60}}] 启用插件[root@mq ~]# rabbitmq-plugins enable rabbitmq_managementThe following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatchApplying plugin configuration to rabbit@mq2...The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatchstarted 3 plugins. 重启服务[root@mq ~]# service rabbitmq-server restartRedirecting to /bin/systemctl restart rabbitmq-server.service 添加帐号:name 密码:passwd[root@mq ~]# rabbitmqctl add_user name passwdAdding user "name" ... 赋予其administrator角色[root@mq ~]# rabbitmqctl set_user_tags name administratorSetting tags for user "name" to [administrator] ... 设置权限[root@mq ~]# rabbitmqctl set_permissions -p / name ".*" ".*" ".*"Setting permissions for user "name" in vhost "/" ...
然后就能够访问进入web管理页面了(外部访问别忘记修改防火墙)。
注:以上步骤本人亲自试过。如有其他错误请访问http://www.baidu.com
保证两个节点都是可以正常工作的。下面我们就将这两个节点连接起来形成高可用的cluster,这样我们就可以让我们的exchange、queue在这两个节点之间复制,形成高可用的queue。
cd 到你的home目录下,里面有一个隐藏的.erlang.cookie文件,这就是我在前面介绍erlang时候提到的,这个文件是erlang用来发现和互连的基础。我们需要做的很简单,将两个节点中的.erlang.cookie设置成一样的。这是erlang的约定,一样的cookie hash key他认为是合法和正确的连接。
找不到的话就选择用 find / -name .erlang.cookie 查找
root@mq2 ]# find / -name .erlang.cookie
/var/lib/rabbitmq/.erlang.cookie
[root@mq2 otp_src_21.2]# cd /var/lib/rabbitmq/
[root@mq2 rabbitmq]# ls -a
. .. config .erlang.cookie mnesia schema
.erlang.cookie默认是只读的,你需要修改下写入权限,然后复制粘贴下cookie 字符串即可。
[root@mq2 rabbitmq]# chmod u+w .erlang.cookie
配置好了之后接下来配置hosts文件,erlang会使用hosts文件里的配置去发现节点。
vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4::1 localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.2.182 rabbitmq_node2192.168.2.152 rabbitmq_node1
保证同样的配置在所有的节点上都是相同的。验证你配置的正确不正确你只需要在你的机器上ping rabbitmq_node1,试下请求的ip是不是你配置的即可。按照DNS的请求原理,hosts是最高优先权,除非浏览器有缓存,你直接用ping就不会有问题的。
选择一个节点stop,然后连接到另外节点。
[root@mqrabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@mq2 ...
[root@rabbitmq_node1 ~]# rabbitmqctl join_cluster rabbit@rabbitmq_node2
Clustering node rabbit@rabbitmq_node1 with rabbit@rabbitmq_node2
如若报错:
Clustering node rabbit@rabbitmq_node1 with rabbit@rabbitmq_node2Error: unable to perform an operation on node 'rabbit@rabbitmq_node2'. Please see diagnostics information and suggestions below.Most common reasons for this are:- Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)- CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)- Target node is not runningIn addition to the diagnostics info below:- See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more- Consult server logs on node rabbit@rabbitmq_node2DIAGNOSTICSattempted to contact: [rabbit@rabbitmq_node2]rabbit@rabbitmq_node2:- connected to epmd (port 4369) on rabbitmq_node2- epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic - TCP connection succeeded but Erlang distribution failed - Authentication failed (rejected by the remote node), please check the Erlang cookieCurrent node details:- node name: rabbitmqcli70@rabbitmq_node1- effective user's home directory: /var/lib/rabbitmq- Erlang cookie hash: D5jFGsQ9svZgg1Q+H9fLgA==
一、查看/etc/hosts是否正确
二、查看cat /var/lib/rabbitmq/.erlang.cookie里面的数据是否一致,如若不一致请修改一致。
节点已经连接成功。
如若是第一个创建的用户登录不进去请重新建立一个administrator账户,进行进去。
默认情况下节点占用的memory是总内存的40%,可以根据自己的用途仔细研究rabbitmq的配置项。为了提高性能,不需要两个节点都是disc的节点,所以我们需要启动一个节点为RAM模式。
[root@rabbitmq_node2 ~]# rabbitmqctl stop_appStopping rabbit application on node rabbit@rabbitmq_node2 ...[root@rabbitmq_node2 ~]# rabbitmqctl change_cluster_node_type ramTurning rabbit@rabbitmq_node2 into a ram node[root@rabbitmq_node2 ~]# rabbitmqctl start_appStarting node rabbit@rabbitmq_node2 ... completed with 3 plugins.
节点是准备好了,接下来我们需要设置exchange、queue 高可用策略,这样才能真的做到高可用。现在是物理上的机器或者说虚拟机节点是高可用的,但是里面的对象需要我们进行配置策略。
RabbitMQ支持很好的策略模式,需要管理员才能操作。
首先我们需要创建一个属于自己业务范围内的vhost,标示一个逻辑上的独立空间,所有的账号、策略、队列都是强制在某个虚拟机里的。我创建了一个common vhost。
开始添加policie。![](C:\Users\ldyld\Pictures\Camera Roll\1545882840(1).png)
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194532476-1243228310.png)
最主要是Apply to ,可以作用在exchange或者queues上,当然也可以包含这两个。策略选择还是比较丰富的,最常用的是HAmode,还有MessageTTL(消息的过期时间)。这些策略按照几个维度分组了,有跟高可用相关的,有Federation(集群之间同步消息)相关的 ,有Queue相关的,还有Exchange相关的。可以根据的业务场景进行调整。
![](C:\Users\ldyld\Pictures\Camera Roll\1545889849(1).png)
我们定义了策略的匹配模式.order.,这样可以避免将所有的exchange、queue都镜像了
![](C:\Users\ldyld\Pictures\Camera Roll\1545890950(1).png)
我们定义了策略的匹配模式.order.,这样可以避免将所有的exchange、queue都镜像了。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194534788-83779070.png)
我们新建了一个ex.order.topic exchange,它的features中应用了exchange_queue_ha策略。(相同的策略是无法叠加使用的。)其他的exchange并没有应用这个策略,是因为我们的pattern限定了只匹配.order.的名称。
![](C:\Users\ldyld\Pictures\Camera Roll\1545891585(1).png)
创建一个qu.order.crm queue,注意看它的node属性里有一个”Synchronised mirrors:rabbit@rabbitmq_node2“镜像复制。features里也应用了exchange_queue_ha策略。这个时候,队列其实在两个节点里都是有的,虽然我们创建的时候是在里的,但是它会复制到集群里的其他节点。在创建HAmode的时候可以提供HA params参数,来限定复制节点的个数,这通常用来提高性能和HA之间的平衡。
在rabbitmq-plugins中有两个plugin还是可以试着研究研究的。rabbitmq-plugins list。
[root@rabbitmq_node2 ~]# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@rabbitmq_node2 |/[ ] rabbitmq_amqp1_0 3.7.4[ ] rabbitmq_auth_backend_cache 3.7.4[ ] rabbitmq_auth_backend_http 3.7.4[ ] rabbitmq_auth_backend_ldap 3.7.4[ ] rabbitmq_auth_mechanism_ssl 3.7.4[ ] rabbitmq_consistent_hash_exchange 3.7.4[ ] rabbitmq_event_exchange 3.7.4[ ] rabbitmq_federation 3.7.4[ ] rabbitmq_federation_management 3.7.4[ ] rabbitmq_jms_topic_exchange 3.7.4[E*] rabbitmq_management 3.7.4[e*] rabbitmq_management_agent 3.7.4[ ] rabbitmq_mqtt 3.7.4[ ] rabbitmq_peer_discovery_aws 3.7.4[ ] rabbitmq_peer_discovery_common 3.7.4[ ] rabbitmq_peer_discovery_consul 3.7.4[ ] rabbitmq_peer_discovery_etcd 3.7.4[ ] rabbitmq_peer_discovery_k8s 3.7.4[ ] rabbitmq_random_exchange 3.7.4[ ] rabbitmq_recent_history_exchange 3.7.4[ ] rabbitmq_sharding 3.7.4[ ] rabbitmq_shovel 3.7.4[ ] rabbitmq_shovel_management 3.7.4[ ] rabbitmq_stomp 3.7.4[ ] rabbitmq_top 3.7.4[ ] rabbitmq_tracing 3.7.4[ ] rabbitmq_trust_store 3.7.4[e*] rabbitmq_web_dispatch 3.7.4[ ] rabbitmq_web_mqtt 3.7.4[ ] rabbitmq_web_mqtt_examples 3.7.4[ ] rabbitmq_web_stomp 3.7.4[ ] rabbitmq_web_stomp_examples 3.7.4
rabbitmq_sharding、rabbitmq_federation,rabbitmq_sharding的版本有点低了,github地址:<;
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194535085-1405490880.png)
Rederation 可以用来进行跨cluster或者node之间同步消息。<;
这个用来在不同的domain之间传递消息还是个不错的解决方案,跨机房或者跨网络区域,订阅别人的rabbitmq消息始终不太稳定,可以用这种方式来传递消息。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194535788-1801747645.png)
有时候可能由于各种原因导致queue mirror失败,这个时候可以手动进行同步,而不是像其他分布式系统来重启节点或者重建数据。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194537194-581264582.png)
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194538960-1663001934.png)
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194539741-1598509320.png)
由于RabbitMQ是AMQP协议的实现,所以在进行远程连接的时候尽量采用amqp协议的方式连接。
var amqpList = new List{ new AmqpTcpEndpoint(new Uri("amqp://192.168.0.105:5672")), new AmqpTcpEndpoint(new Uri("amqp://192.168.0.107:5672")) };
关于集群的vip方案其实也是需要综合考虑的,如果是统一的地址会面临三个问题,DNS、LoadBalance、VIP,这三个点都有可能导致集群连接不上。现在越来越多的方案倾向于在客户端做负载和故障转移,这有很多好处,消除了中间节点带来的故障概率。如果这三个点加在一起出现的可用性指标肯定是比直接在客户端连接的低的多。
我们碰到最多就是VIP的问题,这类系统的VIP不同于数据库,数据库的master\slave大多都是要人工check后才切换,不会随便自动的切换主从库。而非数据库的VIP大多都是Keepalived自动检测切换,这带来一些列问题,包括连接重试、心跳保持。这只是VIP的出错场景之一。还有LoadBalance带来的问题,DNS出错的可能性也是很大。所以我倾向于使用客户端来做这些。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194540788-1994426582.png)
有几个地方很重要,第一个就是消息的Persistent持久化状态要带上,第二个就是ContentType,这个属性很实用,方便你查看消息的正文。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194541601-158848831.png)
如果没设置,默认是null。
第三个就是AutomaticRecoveryEnabled,自动连接重试,这致命重要。当上面的VIP切换之后这个可以保命。第四个就是TopologyRecoveryEnabled,重新恢复Exchange、queue、binding。在出现网络断开之后,一旦恢复连接就会恢复这些设置以保证是最新的设置。
不管rabbitmq保证的多么强壮,多么高可用,记住一定要有备用方案。
在之前我写了一篇文章,
说了就是消息的持久化和补偿。
![](C:\Users\ldyld\Pictures\Camera Roll\202205-20161210194542304-11838963.png)一旦将发送和接受的消息持久化之后我们能做到事情就比较多了。消息补偿是可以做的,异常也不用担心。但是在发送消息的时候一定要注意,是先持久化消息在业务逻辑处理。为了应对特殊活动的监控,还可以开发一定的业务来监控消息的接受和处理的数量,然后自动补偿。
在开发补偿程序的时候有一个逻辑挺饶人的,当你对某一个消息进行补偿的时候会多出发送消息,而接受的消息肯定是比你发送的少。所以你在统计的时候记得DISTINCT下。
转载于:https://blog.51cto.com/14142911/2344066