Hi Guys,
Good day!
I have question regarding how to consume a specific message belongs to that
consumer group?
Here's the scenario:
Publish message "A" to topic "X"
Consume by Consumer Group A the message "A" from topic "X"
Publish message "B" to topic "X"
Consume by Consumer Group B the message "B" from topic "X"
How can achieve this in scala? Because both consumers will consume the
message even if it assigned a groupId.
Here is my producer code:
def main(args: Array[String]): Unit = {
val zookeeperUrl: String = "localhost:2182"
val kafkaServerUrl: String = "localhost:9092,localhost:9093"
val topic: String = "Topic_X"
val groupId: String = "Consumer_Group_A"
val eventType: String = "delete"
val deleteRetention: Int = 5240000
val producerConfig =
ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl,
topic, groupId, deleteRetention.toString)
val producer = new KafkaProducer[String, String](producerConfig)
val producerPayload = ProducerPayload("{\"batch_id\":\"" +
UUID.randomUUID().toString()
+ "\", \"document_id\":\"" + UUID.randomUUID().toString()
+ "\", \"type\":\"" + eventType
+ "\"}", "", topic, groupId, deleteRetention)
ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl,
producerPayload)
logger.info("Done.")
}
Here is my consumer code:
def consumeMessage = Action { implicit rs =>
val zookeeperUrl: String = "localhost:2182"
val kafkaServerUrl: String = "localhost:9092,localhost:9093"
val topic: String = "Topic_X"
val groupId: String = "Consumer_Group_A"
val config = ConsumerService.createConsumerConfig(zookeeperUrl,
kafkaServerUrl, groupId)
val consumer = kafka.consumer.Consumer.create(config)
val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
val streams = consumerMap.get(topic).get
val it = streams(0).iterator()
while (it.hasNext()) {
val msg = new String(it.next().message())
logger.info(s"Message successfully consumed from topic ${topic} => " + msg)
}
consumer.shutdown()
logger.info("Done.")
Ok
}
Your help is much appreciated. Thank you*Sincerely yours,*
*Rico Nodalo Lugod*
Senior Java / J2EE / SOA - Developer
Cebu City, Philippines 6000
Email: [email protected]