Producteur
Envoie des messages à la file d’attente.
Les files de messages (MQ) sont une forme de communication asynchrone utilisée pour permettre aux systèmes distribués de communiquer et de traiter les tâches efficacement. Les files aident à découpler les composants, améliorent la scalabilité et renforcent la tolérance aux pannes.
Producteur
Envoie des messages à la file d’attente.
Consommateur
Reçoit et traite les messages de la file d’attente.
Broker
Gère la file d’attente et assure la livraison des messages aux consommateurs.
Message
Les données envoyées du producteur au consommateur.
# Producteurimport pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Bonjour, RabbitMQ !')print(" [x] Envoyé 'Bonjour, RabbitMQ!'")connection.close()
# Consommateurimport pika
def callback(ch, method, properties, body):print(f" [x] Reçu {body}")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] En attente de messages. Pour quitter, appuyez sur CTRL+C')channel.start_consuming()// Producteurimport org.apache.kafka.clients.producer.*;
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("test-topic", "key", "Bonjour, Kafka !")); producer.close();
// Consommateur import org.apache.kafka.clients.consumer.*;
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }// Producteurusing Azure.Messaging.ServiceBus;
string connectionString = "Endpoint=sb://votre-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=votre-clé";string queueName = "test-queue";
ServiceBusClient client = new ServiceBusClient(connectionString);ServiceBusSender sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage("Bonjour, Azure Service Bus !");await sender.SendMessageAsync(message);await sender.DisposeAsync();await client.DisposeAsync();
// Consommateurusing Azure.Messaging.ServiceBus;
string connectionString = "Endpoint=sb://votre-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=votre-clé";string queueName = "test-queue";
ServiceBusClient client = new ServiceBusClient(connectionString);ServiceBusProcessor processor = client.CreateProcessor(queueName);
processor.ProcessMessageAsync += MessageHandler;processor.ProcessErrorAsync += ErrorHandler;
async Task MessageHandler(ProcessMessageEventArgs args){ string body = args.Message.Body.ToString(); Console.WriteLine(\$"Reçu : {body}"); await args.CompleteMessageAsync(args.Message);}
Task ErrorHandler(ProcessErrorEventArgs args){ Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask;}
await processor.StartProcessingAsync();Console.WriteLine("En attente de messages...");Console.ReadLine();await processor.DisposeAsync();await client.DisposeAsync();| Avantages | Inconvénients |
|---|---|
| Découple les composants et améliore la scalabilité | Ajoute de la complexité au système |
| Renforce la tolérance aux pannes et la fiabilité | Nécessite une infrastructure supplémentaire |
| Permet un traitement asynchrone | Peut introduire de la latence |