分布机制
使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。
例:分布机制
- 定义交换机
- 多个消费者同时订阅一个队列
- 模式采用手动应答
生产者:
package com.example.rabbbitmqDemo.mq;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;/** * @author Aaron 生产者 */public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置MQ相关信息 factory.setHost("localhost"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("guest"); factory.setPassword("guest"); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); String QUEUE_NAME = "queue.work"; String ROUTING_KEY = "task"; String EXCHANGE_NAME = "amqexc.rabbitmq.work"; // 声明一个队列 // 注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 声明交换机:指定交换机的名称和类型(direct) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 循环发布多条消息 for (int i = 0; i < 10; i++) { String message = "Hello RabbitMQ " + i; // 注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); } // 关闭通道和连接 channel.close(); connection.close(); }}
消费者:
package com.example.rabbbitmqDemo.mq;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * @author Aaron 消费者 */public class Consumer01 { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); // 设置每次从队列获取消息的数量 channel.basicQos(1); // 设置MQ相关信息 factory.setHost("localhost"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("guest"); factory.setPassword("guest"); String QUEUE_NAME = "queue.work"; // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery // 其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { doWork(message); // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }; // 订阅消息, false: 表示手动应答,需要手动调用basicAck()来应答 channel.basicConsume(QUEUE_NAME, false, consumer); // 睡眠是为了不让程序立即结束,这样还有机会获取第二条消息 Thread.sleep(1000000); } private static void doWork(String message) throws Exception { System.out.println(" [C] Customer1 received '" + message + "', 处理业务中..."); // 模仿消费者处理业务的时间,也让其他消费者有机会获取到消息,实际开发中不需要,这里只是模拟 Thread.sleep(1000); }}
package com.example.rabbbitmqDemo.mq;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * @author Aaron 消费者 */public class Consumer02 { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 创建连接 Connection connection = factory.newConnection(); // 创建通道 Channel channel = connection.createChannel(); // 设置每次从队列获取消息的数量 channel.basicQos(1); // 设置MQ相关信息 factory.setHost("localhost"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("guest"); factory.setPassword("guest"); String QUEUE_NAME = "queue.work"; // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("Customer Waiting Received messages"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery // 其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { doWork(message); // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }; // 订阅消息, false: 表示手动应答,需要手动调用basicAck()来应答 channel.basicConsume(QUEUE_NAME, false, consumer); // 睡眠是为了不让程序立即结束,这样还有机会获取第二条消息 Thread.sleep(1000000); } private static void doWork(String message) throws Exception { System.out.println(" [C] Customer2 received '" + message + "', 处理业务中..."); // 模仿消费者处理业务的时间,也让其他消费者有机会获取到消息,实际开发中不需要,这里只是模拟 Thread.sleep(1000); }}
结果:
Customer Waiting Received messages [C] Customer1 received 'Hello RabbitMQ 4', 处理业务中... [C] Customer1 received 'Hello RabbitMQ 6', 处理业务中... [C] Customer1 received 'Hello RabbitMQ 8', 处理业务中...
Customer Waiting Received messages [C] Customer2 received 'Hello RabbitMQ 0', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 1', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 2', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 3', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 5', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 7', 处理业务中... [C] Customer2 received 'Hello RabbitMQ 9', 处理业务中...
RabbitMQ负载均衡机制
对于RabbitMQ而言可以在客户端连接时简单的使用负载均衡算法来实现负载均衡。负载均衡算法有很多种,主流的有:
1.轮询调度算法 (Round-Robin)
在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个的分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
当消息进入队列,RabbitMQ就会分派消息。它不看消费者的应答的数目,也不关心消费者处理消息的能力,只是盲目的暴力的将第n条消息发给第n个消费者。2.公平转发(Fair dispatch)
当工作队列中有两个消费者中,可以看到消费者1收到的消息都是偶数条,消费者1都是奇数条,假如偶数条的消息处理比较耗时,奇数条的消息处理很快耗时短,当有多条消息在队列中,队列一下子就把所有奇数条消息推送给消费者1,把所有偶数条消息推送给消费者2,由于消费者1处理的消息不叫耗时,消费者2处理比较快,很可能出现当消费者1才处理几条的时,消费者2就已经完全处理了,这样消费者2就处理空闲状态,而消费者1却忙的跟狗似的。为了解决这种现象,让干的快的干完了帮助干的慢的分担点任务,RabbitMQ采用限制消费者一次从队列中获取消息的条数,而不是一下子把满足条件的消息都推送个某个消费者,通过使用channel.basicQos(1)告诉消费者一次只能从队列中预先获取一条(预提取数量prefetchCount),处理完了再获取另一条,这样其他消息仍然在队列中,还没有被分发出去,这样就会造成处理消息慢的继续处理当前消息,处理消息的快的由于一次只能从队列中获取一条,处理完继续从队列中获取,这样就会出现能者多劳,大家谁都不会闲着。使用公平转发这种方式支持动态添加消费者,比如队列中的消息很多,两个消费者处理不过来,需要再增加消费者来处理,由于消息还在队列中,还没有被分发出去,这样再增加消费者,消费者就能马上从队列中获取消息,立即投入进来工作。将上面示例的代码消费者2中doWork的睡眠时间改为500毫秒,Thread.sleep(500);这样消费者1处理消息的时间是1000毫秒,比消费者2慢了一半,消费者1就处理的条数就比消费者2的就少。3.随机法
通过随机算法,根据后端服务器的列表大小值来随机选取其中的一台服务器进行访问。由概率统计理论可以得知,随着客户端调用服务端的次数增多,其实际效果越来越接近于平均分配调用量到后端的每一台服务器,也就是轮询的结果。对应的示例代码如下:public class RandomAccess { private static Listlist = new ArrayList () { { add("192.168.0.2"); add("192.168.0.3"); add("192.168.0.4"); } }; public static String getConnectionAddress() { Random random = new Random(); int pos = random.nextInt(list.size()); return list.get(pos); }}
4.源地址哈希法
源地址哈希的思想是根据获取的客户端IP地址,通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算,得到的结果便是客户端要访问服务器的序号。采用源地址哈希法进行负载均衡,同一IP地址的客户端,当后端服务器列表不变时,它每次都会映射到同一台后端服务器进行访问。
public class IpHash { private static Listlist = new ArrayList () { { add("192.168.0.2"); add("192.168.0.3"); add("192.168.0.4"); } }; public static String getConnectionAddress() throws UnknownHostException { int ipHashCode = InetAddress.getLocalHost().getHostAddress().hashCode(); int pos = ipHashCode % list.size(); return list.get(pos); }}
5.加权轮询法
不同的后端服务器可能机器的配置和当前系统的负载并不相同,因此它们的抗压能力也不相同。给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的集群,给其分配较低的权重,降低其系统负载,加权轮询能很好地处理这一问题,并将请求顺序且按照权重分配到后端。6.加权随机法
与加权轮询法一样,加权随机法也根据后端机器的配置、系统的负载分配不同权重。不同的是,它是按照权重随机请求后端服务器,而非顺序。7.最小连接数法
最小连接数算法比较灵活和智能,由于后端服务器的配置不尽相同,对于请求的处理有块有慢,它是根据后端服务器当前的连接情况,动态地选取其中当前积压连接数最少的一台服务器来处理当前的请求,尽可能地提高后端服务的利用效率,将负载合理地分流到每一台服务器。
临时队列(Temporary queues)
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue(); 一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。@Override public AMQP.Queue.DeclareOk queueDeclare() throws IOException { // 底层会随机产生一个队列名称 return queueDeclare("", false, true, true, null);}
消息持久化(Message durability)
默认队列和消息都是放在内存中的,当RabbitMQ退出或者崩溃,将会丢失队列和消息。为了保证即使RabbitMQ崩溃也不会丢失消息,我们必须把“队列”和“消息”设为持久化,当队列和消息持久化以后即使RabbitMQ崩溃,消息还存在数据库中,当RabbitMQ再次启动的时候,队列和消息仍然还在。
// 队列持久化boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); // 消息持久化 方式一channel.basicPublish("", "key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));// 消息持久化 方式二AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();properties.deliveryMode(2); // 设置消息是否持久化,1: 非持久化 2:持久化channel.basicPublish("", "key", properties.build(), message.getBytes("UTF-8"));
原文:
https://blog.csdn.net/u013256816/article/details/77131753
https://blog.csdn.net/vbirdbest/article/category/7296570