Hi All,

I'm trying out a simple example of reading data off a Kafka topic into Apache 
Beam. Here's the relevant snippet:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))

Using the above Beam pipeline snippet, I don't see any messages coming in. 
Kafka is running locally in a docker container, and I'm able to use `kafkacat` 
from the host (outside the container) to publish and subscribe to messages. So, 
I guess there are no issues on that front.

It appears that Beam is able to connect to Kafka and get notified of new 
messages, as I see the offset changes in the Beam logs as I publish data from 
`kafkacat`:

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer 
clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, 
groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of 
partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer 
clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, 
groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for 
partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

This is how I'm publishing data using `kafkacat`:

$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar

and I can confirm that its being received, again using `kafkacat`:

$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar

But despite this, I don't see the actual message being printed by Beam as I 
expected. Any pointers to what's missing here are appreciated. I'm suspecting 
this could be a decoding issue on the Beam pipeline side, but could be 
incorrect.

Thanks in advance for any pointers!

Cheers,
Sumeet

Reply via email to