Hey Cham!

Appreciate the response. I tried out your suggestions (details below), but
I still don't see any data being consumed or written back to Kafka (as per
your suggestion). I'm also providing additional details/context that might
help narrow down the issue. Apologies for being a bit verbose from hereon!

First, here's what my pipeline code looks like now:

~~~~~~
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions

def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
  pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,
streaming=True)

  logging.info('Starting data pipeline. bootstrap_servers=%s in_topic=%s
out_topic=%s',
      str(bootstrap_servers), in_topic, out_topic)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from kafka' >> ReadFromKafka(
            consumer_config={
                'bootstrap.servers': bootstrap_servers,
                'auto.offset.reset': 'earliest'
            },
            topics=[in_topic])
        | 'Write to kafka' >> WriteToKafka(
            producer_config={
                'bootstrap.servers': bootstrap_servers
            },
            topic=out_topic))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  import argparse

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--bootstrap_servers',
      dest='bootstrap_servers',
      required=True,
      help='Bootstrap servers for the Kafka cluster')
  parser.add_argument(
      '--in_topic',
      dest='in_topic',
      required=True,
      help='Kafka topic to read data from')
  parser.add_argument(
      '--out_topic',
      dest='out_topic',
      required=True,
      help='Kafka topic to write data to')
  known_args, pipeline_args = parser.parse_known_args()

  run(known_args.bootstrap_servers, known_args.in_topic,
known_args.out_topic, pipeline_args)
~~~~~

I'm firing this pipeline as follows:

python ./pipeline.py --bootstrap_servers=localhost:29092
--in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner

I have pre-populated the Kafka topic with 3 records:

$ kafkacat -C -b localhost:29092 -t in_topic
v1
v2
v3

Now, when I execute the pipeline, I see that it starts to read records from
offset 0, but then seeks to the latest offset 3 without processing the
records. I don't see any data written to out_topic. I filtered out the logs
a bit, and this is what I'm seeing:

INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
in_topic=in_topic out_topic=out_topic
INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned to
split 0 (total 1): in_topic-0'
INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0'
INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
clientId=consumer-2, groupId=null] Resetting offset for partition
in_topic-0 to offset 0.'
INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading from
in_topic-0 starting at offset 0'
INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
partition(s): in_topic-0'
INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset
of partition in_topic-0'
INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
partition in_topic-0 to offset 3.'

Additionally, the logs also emit complete consumer and producer configs.
I'm dumping them here, in case that helps:

Consumer Config:

INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig values:'
INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics =
true'
INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms = 5000'
INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = earliest'
INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
[localhost:29092]'
INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
540000'
INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms = 60000'
INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = false'
INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics = true'
INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = 52428800'
INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500'
INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
Reader-0_offset_consumer_1947524890_none'
INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null'
INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms = 3000'
INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
= true'
INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
read_uncommitted'
INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer'
INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes =
1048576'
INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = 300000'
INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = 300000'
INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = INFO'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
30000'
INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]'
INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 65536'
INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
1000'
INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
= null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
/usr/bin/kinit'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
= 60000'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name =
null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
= 0.05'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
= 0.8'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
= null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
= 300'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
= 60'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
= 0.8'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
= 0.05'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = PLAINTEXT'
INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = 10000'
INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
[TLSv1.2, TLSv1.1, TLSv1]'
INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
= https'
INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
SunX509'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
= null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm =
PKIX'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer'

Producer Config:

INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig values:'
INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
[localhost:29092]'
INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432'
INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none'
INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
540000'
INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = 120000'
INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = false'
INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer'
INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
= 5'
INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = 1048576'
INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = 300000'
INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = INFO'
INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
30000'
INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner'
INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 32768'
INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
1000'
INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
= null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
/usr/bin/kinit'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
= 60000'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name =
null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
= 0.05'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
= 0.8'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
= null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
= 300'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
= 60'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
= 0.8'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
= 0.05'
INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = PLAINTEXT'
INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
[TLSv1.2, TLSv1.1, TLSv1]'
INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
= https'
INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
SunX509'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
= null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm =
PKIX'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = null'
INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms = 60000'
INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null'
INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer'


Apologies again for dumping almost everything here :-) Any pointers on what
might be the issue are appreciated.

Thanks,
Sumeet



On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <[email protected]>
wrote:

> Also can you try sending messages back to Kafka (or another distributed
> system like GCS) instead of just printing them ? (given that multi-language
> pipelines run SDK containers in Docker you might  not see prints in the
> original console I think).
>
> Thanks,
> Cham
>
> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Sumeet,
>>
>> It seems like your kafka consumer uses the LATEST offset(which is default
>> setting) as the start offset to read, which is 29. Do you have more than 29
>> records to read at that point? If the pipeline is only for testing purpose,
>> I would recommend reading from earliest offset to see whether you get
>> records. You can do so by constructing your ReadFromKafka like:
>> ReadFromKafka(
>>             consumer_config={'bootstrap.servers': 'localhost:29092',
>> 'auto.offset.reset':'earliest'},
>>             topics=['test'])
>>
>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>> [email protected]> wrote:
>>
>>> Hi All,
>>>
>>> I'm trying out a simple example of reading data off a Kafka topic into
>>> Apache Beam. Here's the relevant snippet:
>>>
>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>     _ = (
>>>         pipeline
>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>             consumer_config={'bootstrap.servers': 'localhost:29092'},
>>>             topics=['test'])
>>>         | 'Print' >> beam.Map(print))
>>>
>>> Using the above Beam pipeline snippet, I don't see any messages coming
>>> in. Kafka is running locally in a docker container, and I'm able to use
>>> `kafkacat` from the host (outside the container) to publish and subscribe
>>> to messages. So, I guess there are no issues on that front.
>>>
>>> It appears that Beam is able to connect to Kafka and get notified of new
>>> messages, as I see the offset changes in the Beam logs as I publish data
>>> from `kafkacat`:
>>>
>>> INFO:root:severity: INFO
>>> timestamp {
>>>   seconds: 1612886861
>>>   nanos: 534000000
>>> }
>>> message: "[Consumer
>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset
>>> of partition test-0"
>>> log_location:
>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>> thread: "22"
>>>
>>> INFO:root:severity: INFO
>>> timestamp {
>>>   seconds: 1612886861
>>>   nanos: 537000000
>>> }
>>> message: "[Consumer
>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
>>> partition test-0 to offset 29."
>>> log_location:
>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>> thread: "22"
>>>
>>> This is how I'm publishing data using `kafkacat`:
>>>
>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>> 1:foo
>>> 1:bar
>>>
>>> and I can confirm that its being received, again using `kafkacat`:
>>>
>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>> Key: 1 Value: foo
>>> Key: 1 Value: bar
>>>
>>> But despite this, I don't see the actual message being printed by Beam
>>> as I expected. Any pointers to what's missing here are appreciated. I'm
>>> suspecting this could be a decoding issue on the Beam pipeline side, but
>>> could be incorrect.
>>>
>>> Thanks in advance for any pointers!
>>>
>>> Cheers,
>>> Sumeet
>>>
>>

Reply via email to