Producer
Sends messages to the queue.
Message Queues (MQ) are a form of asynchronous communication used to enable distributed systems to communicate and process tasks efficiently. Queues help decouple components, improve scalability, and enhance fault tolerance.
Producer
Sends messages to the queue.
Consumer
Receives and processes messages from the queue.
Broker
Manages the queue and ensures messages are delivered to consumers.
Message
The data sent from the producer to the consumer.
# Producerimport pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello, RabbitMQ!')print(" [x] Sent 'Hello, RabbitMQ!'")connection.close()
# Consumerimport pika
def callback(ch, method, properties, body): print(f" [x] Received {body}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()// Producerimport org.apache.kafka.clients.producer.*;
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("test-topic", "key", "Hello, Kafka!"));producer.close();
// Consumerimport org.apache.kafka.clients.consumer.*;
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}// Producerusing Azure.Messaging.ServiceBus;
string connectionString = "Endpoint=sb://your-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key";string queueName = "test-queue";
ServiceBusClient client = new ServiceBusClient(connectionString);ServiceBusSender sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage("Hello, Azure Service Bus!");await sender.SendMessageAsync(message);await sender.DisposeAsync();await client.DisposeAsync();
// Consumerusing Azure.Messaging.ServiceBus;
string connectionString = "Endpoint=sb://your-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key";string queueName = "test-queue";
ServiceBusClient client = new ServiceBusClient(connectionString);ServiceBusProcessor processor = client.CreateProcessor(queueName);
processor.ProcessMessageAsync += MessageHandler;processor.ProcessErrorAsync += ErrorHandler;
async Task MessageHandler(ProcessMessageEventArgs args){ string body = args.Message.Body.ToString(); Console.WriteLine(\$"Received: {body}"); await args.CompleteMessageAsync(args.Message);}
Task ErrorHandler(ProcessErrorEventArgs args){ Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask;}
await processor.StartProcessingAsync();Console.WriteLine("Waiting for messages...");Console.ReadLine();await processor.DisposeAsync();await client.DisposeAsync();| Pros | Cons |
|---|---|
| Decouples components and improves scalability | Adds complexity to the system |
| Enhances fault tolerance and reliability | Requires additional infrastructure |
| Enables asynchronous processing | Can introduce latency |