Also can you try sending messages back to Kafka (or another distributed
system like GCS) instead of just printing them ? (given that multi-language
pipelines run SDK containers in Docker you might  not see prints in the
original console I think).

Thanks,
Cham

On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> wrote:

> Hi Sumeet,
>
> It seems like your kafka consumer uses the LATEST offset(which is default
> setting) as the start offset to read, which is 29. Do you have more than 29
> records to read at that point? If the pipeline is only for testing purpose,
> I would recommend reading from earliest offset to see whether you get
> records. You can do so by constructing your ReadFromKafka like:
> ReadFromKafka(
>             consumer_config={'bootstrap.servers': 'localhost:29092',
> 'auto.offset.reset':'earliest'},
>             topics=['test'])
>
> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <[email protected]>
> wrote:
>
>> Hi All,
>>
>> I'm trying out a simple example of reading data off a Kafka topic into
>> Apache Beam. Here's the relevant snippet:
>>
>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>     _ = (
>>         pipeline
>>         | 'Read from Kafka' >> ReadFromKafka(
>>             consumer_config={'bootstrap.servers': 'localhost:29092'},
>>             topics=['test'])
>>         | 'Print' >> beam.Map(print))
>>
>> Using the above Beam pipeline snippet, I don't see any messages coming
>> in. Kafka is running locally in a docker container, and I'm able to use
>> `kafkacat` from the host (outside the container) to publish and subscribe
>> to messages. So, I guess there are no issues on that front.
>>
>> It appears that Beam is able to connect to Kafka and get notified of new
>> messages, as I see the offset changes in the Beam logs as I publish data
>> from `kafkacat`:
>>
>> INFO:root:severity: INFO
>> timestamp {
>>   seconds: 1612886861
>>   nanos: 534000000
>> }
>> message: "[Consumer
>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset
>> of partition test-0"
>> log_location:
>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>> thread: "22"
>>
>> INFO:root:severity: INFO
>> timestamp {
>>   seconds: 1612886861
>>   nanos: 537000000
>> }
>> message: "[Consumer
>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
>> partition test-0 to offset 29."
>> log_location:
>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>> thread: "22"
>>
>> This is how I'm publishing data using `kafkacat`:
>>
>> $ kafkacat -P -b localhost:29092 -t test -K:
>> 1:foo
>> 1:bar
>>
>> and I can confirm that its being received, again using `kafkacat`:
>>
>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>> Key: 1 Value: foo
>> Key: 1 Value: bar
>>
>> But despite this, I don't see the actual message being printed by Beam as
>> I expected. Any pointers to what's missing here are appreciated. I'm
>> suspecting this could be a decoding issue on the Beam pipeline side, but
>> could be incorrect.
>>
>> Thanks in advance for any pointers!
>>
>> Cheers,
>> Sumeet
>>
>

Reply via email to