Also can you try sending messages back to Kafka (or another distributed system like GCS) instead of just printing them ? (given that multi-language pipelines run SDK containers in Docker you might not see prints in the original console I think).
Thanks, Cham On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> wrote: > Hi Sumeet, > > It seems like your kafka consumer uses the LATEST offset(which is default > setting) as the start offset to read, which is 29. Do you have more than 29 > records to read at that point? If the pipeline is only for testing purpose, > I would recommend reading from earliest offset to see whether you get > records. You can do so by constructing your ReadFromKafka like: > ReadFromKafka( > consumer_config={'bootstrap.servers': 'localhost:29092', > 'auto.offset.reset':'earliest'}, > topics=['test']) > > On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <[email protected]> > wrote: > >> Hi All, >> >> I'm trying out a simple example of reading data off a Kafka topic into >> Apache Beam. Here's the relevant snippet: >> >> with beam.Pipeline(options=pipeline_options) as pipeline: >> _ = ( >> pipeline >> | 'Read from Kafka' >> ReadFromKafka( >> consumer_config={'bootstrap.servers': 'localhost:29092'}, >> topics=['test']) >> | 'Print' >> beam.Map(print)) >> >> Using the above Beam pipeline snippet, I don't see any messages coming >> in. Kafka is running locally in a docker container, and I'm able to use >> `kafkacat` from the host (outside the container) to publish and subscribe >> to messages. So, I guess there are no issues on that front. >> >> It appears that Beam is able to connect to Kafka and get notified of new >> messages, as I see the offset changes in the Beam logs as I publish data >> from `kafkacat`: >> >> INFO:root:severity: INFO >> timestamp { >> seconds: 1612886861 >> nanos: 534000000 >> } >> message: "[Consumer >> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset >> of partition test-0" >> log_location: >> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >> thread: "22" >> >> INFO:root:severity: INFO >> timestamp { >> seconds: 1612886861 >> nanos: 537000000 >> } >> message: "[Consumer >> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for >> partition test-0 to offset 29." >> log_location: >> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >> thread: "22" >> >> This is how I'm publishing data using `kafkacat`: >> >> $ kafkacat -P -b localhost:29092 -t test -K: >> 1:foo >> 1:bar >> >> and I can confirm that its being received, again using `kafkacat`: >> >> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >> Key: 1 Value: foo >> Key: 1 Value: bar >> >> But despite this, I don't see the actual message being printed by Beam as >> I expected. Any pointers to what's missing here are appreciated. I'm >> suspecting this could be a decoding issue on the Beam pipeline side, but >> could be incorrect. >> >> Thanks in advance for any pointers! >> >> Cheers, >> Sumeet >> >
