Hi Cham, Do you have pointers on what might be going on? Or something else I can try? I had posted the same on StackOverflow [1], it seems that I'm not the only one seeing this issue at the moment.
Thanks, Sumeet [1] https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <[email protected]> wrote: > Took me some time to setup the Java test (using Java after more than a > decade!), but yes a similar pipeline with KafkaIO and Flink seems to work > fine. > > Here's the relevant Java code. The only difference from the Python version > is that I had to extract the KV from the KafkaRecord object and construct a > PCollection<KV> explicitly before writing to the output topic. > > ~~~~~~~~ > package org.apache.beam.kafka.test; > > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.kafka.KafkaIO; > import org.apache.beam.sdk.io.kafka.KafkaRecord; > import org.apache.beam.sdk.options.Default; > import org.apache.beam.sdk.options.Description; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.*; > import org.apache.beam.sdk.values.KV; > import org.apache.beam.sdk.values.PCollection; > import org.apache.kafka.common.serialization.StringDeserializer; > > public class KafkaTest { > > static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default > bootstrap kafka servers > static final String INPUT_TOPIC = "in_topic"; // Default input kafka > topic name > static final String OUTPUT_TOPIC = "out_topic"; // Default output kafka > topic name > > /** Specific pipeline options. */ > public interface KafkaTestOptions extends PipelineOptions { > @Description("Kafka bootstrap servers") > @Default.String(BOOTSTRAP_SERVERS) > String getBootstrap(); > > void setBootstrap(String value); > > @Description("Kafka input topic name") > @Default.String(INPUT_TOPIC) > String getInputTopic(); > > void setInputTopic(String value); > > @Description("Kafka output topic name") > @Default.String(OUTPUT_TOPIC) > String getOutputTopic(); > > void setOutputTopic(String value); > } > > public static final void main(String[] args) throws Exception { > final KafkaTestOptions options = > > PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); > > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply( > "ReadFromKafka", > KafkaIO.<String, String>read() > .withBootstrapServers(options.getBootstrap()) > .withTopic(options.getInputTopic()) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .apply( > "PrepareForWriting", > ParDo.of( > new DoFn<KafkaRecord<String, String>, KV<String, > String>>() { > @ProcessElement > public void processElement(ProcessContext c) throws > Exception { > c.output(KV.of(c.element().getKV().getKey(), > c.element().getKV().getValue())); > } > })) > .apply( > "WriteToKafka", > KafkaIO.<String, String>write() > .withBootstrapServers(options.getBootstrap()) > .withTopic(options.getOutputTopic()) > > .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) > > .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); > > pipeline.run(); > } > } > ~~~~~~~~~ > > I'm firing the Java version as follows: > > $ mvn exec:java > -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner > -Dexec.args="--runner=FlinkRunner" > > And I can see in real time, that as I publish records to the in_topic, the > out_topic is able to receive them on a continuous basis. > > I hope this helps narrow down the issue. > > Thanks, > Sumeet > > > On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <[email protected]> > wrote: > >> Are you able to run a similar Java streaming pipeline using KafkaIO and >> Flink ? (without x-lang) >> >> Thanks, >> Cham >> >> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >> [email protected]> wrote: >> >>> Hi Cham! >>> >>> So finally I was able to get partial success. Since I had pre-populated >>> the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see >>> if it can read all existing records, as follows: >>> >>> with beam.Pipeline(options=pipeline_options) as pipeline: >>> _ = ( >>> pipeline >>> | 'Read from kafka' >> ReadFromKafka( >>> consumer_config={ >>> 'bootstrap.servers': bootstrap_servers, >>> 'auto.offset.reset': 'earliest'}, >>> topics=[in_topic], >>> max_num_records=3) >>> | 'Write to kafka' >> WriteToKafka( >>> producer_config={ >>> 'bootstrap.servers': bootstrap_servers}, >>> topic=out_topic)) >>> >>> I was able to see all 3 records being read, and written successfully to >>> the out_topic as well. So, it appears that there might be some issue with >>> reading unbounded Kafka streams here? Or is there any setting that I might >>> be missing? >>> >>> Thanks, >>> Sumeet >>> >>> >>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>> [email protected]> wrote: >>> >>>> Hey Cham! >>>> >>>> Appreciate the response. I tried out your suggestions (details below), >>>> but I still don't see any data being consumed or written back to Kafka (as >>>> per your suggestion). I'm also providing additional details/context that >>>> might help narrow down the issue. Apologies for being a bit verbose from >>>> hereon! >>>> >>>> First, here's what my pipeline code looks like now: >>>> >>>> ~~~~~~ >>>> import apache_beam as beam >>>> from apache_beam.io.kafka import ReadFromKafka >>>> from apache_beam.io.kafka import WriteToKafka >>>> from apache_beam.options.pipeline_options import PipelineOptions >>>> >>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>> pipeline_options = PipelineOptions(pipeline_args, >>>> save_main_session=True, streaming=True) >>>> >>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>> in_topic=%s out_topic=%s', >>>> str(bootstrap_servers), in_topic, out_topic) >>>> >>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>> _ = ( >>>> pipeline >>>> | 'Read from kafka' >> ReadFromKafka( >>>> consumer_config={ >>>> 'bootstrap.servers': bootstrap_servers, >>>> 'auto.offset.reset': 'earliest' >>>> }, >>>> topics=[in_topic]) >>>> | 'Write to kafka' >> WriteToKafka( >>>> producer_config={ >>>> 'bootstrap.servers': bootstrap_servers >>>> }, >>>> topic=out_topic)) >>>> >>>> if __name__ == '__main__': >>>> logging.getLogger().setLevel(logging.INFO) >>>> import argparse >>>> >>>> parser = argparse.ArgumentParser() >>>> parser.add_argument( >>>> '--bootstrap_servers', >>>> dest='bootstrap_servers', >>>> required=True, >>>> help='Bootstrap servers for the Kafka cluster') >>>> parser.add_argument( >>>> '--in_topic', >>>> dest='in_topic', >>>> required=True, >>>> help='Kafka topic to read data from') >>>> parser.add_argument( >>>> '--out_topic', >>>> dest='out_topic', >>>> required=True, >>>> help='Kafka topic to write data to') >>>> known_args, pipeline_args = parser.parse_known_args() >>>> >>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>> known_args.out_topic, pipeline_args) >>>> ~~~~~ >>>> >>>> I'm firing this pipeline as follows: >>>> >>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>> >>>> I have pre-populated the Kafka topic with 3 records: >>>> >>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>> v1 >>>> v2 >>>> v3 >>>> >>>> Now, when I execute the pipeline, I see that it starts to read records >>>> from offset 0, but then seeks to the latest offset 3 without processing the >>>> records. I don't see any data written to out_topic. I filtered out the logs >>>> a bit, and this is what I'm seeing: >>>> >>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >>>> in_topic=in_topic out_topic=out_topic >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned to >>>> split 0 (total 1): in_topic-0' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>> in_topic-0 to offset 0.' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading from >>>> in_topic-0 starting at offset 0' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>> partition(s): in_topic-0' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset >>>> of partition in_topic-0' >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>> partition in_topic-0 to offset 3.' >>>> >>>> Additionally, the logs also emit complete consumer and producer >>>> configs. I'm dumping them here, in case that helps: >>>> >>>> Consumer Config: >>>> >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig values:' >>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics = >>>> true' >>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms = >>>> 5000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>> earliest' >>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>> [localhost:29092]' >>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>> default' >>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms = >>>> 540000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms = >>>> 60000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = false' >>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics = >>>> true' >>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = 52428800' >>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500' >>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>> Reader-0_offset_consumer_1947524890_none' >>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms = >>>> 3000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>> = true' >>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>> read_uncommitted' >>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>> = 1048576' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = >>>> 300000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>> 300000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >>>> INFO' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>> = 30000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>> 65536' >>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>> = 1000' >>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50' >>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>> 30000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >>>> /usr/bin/kinit' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>> = 60000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>> = 0.05' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>> = 0.8' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>> = 300' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>> = 60' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>> = 0.8' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>> = 0.05' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>> PLAINTEXT' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>> 10000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>> = https' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm = >>>> SunX509' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>> = PKIX' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>> >>>> Producer Config: >>>> >>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig values:' >>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>> [localhost:29092]' >>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432' >>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>> default' >>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none' >>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms = >>>> 540000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >>>> 120000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = false' >>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>> = 5' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = 1048576' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>> 300000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >>>> INFO' >>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>> = 30000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class >>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>> 32768' >>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>> = 1000' >>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50' >>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>> 30000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >>>> /usr/bin/kinit' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>> = 60000' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>> = 0.05' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>> = 0.8' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>> = 300' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>> = 60' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>> = 0.8' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>> = 0.05' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>> PLAINTEXT' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>> = https' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm = >>>> SunX509' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>> = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>> = PKIX' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >>>> null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms = >>>> 60000' >>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null' >>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class >>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>> >>>> >>>> Apologies again for dumping almost everything here :-) Any pointers on >>>> what might be the issue are appreciated. >>>> >>>> Thanks, >>>> Sumeet >>>> >>>> >>>> >>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> Also can you try sending messages back to Kafka (or another >>>>> distributed system like GCS) instead of just printing them ? (given that >>>>> multi-language pipelines run SDK containers in Docker you might not see >>>>> prints in the original console I think). >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Sumeet, >>>>>> >>>>>> It seems like your kafka consumer uses the LATEST offset(which is >>>>>> default setting) as the start offset to read, which is 29. Do you have >>>>>> more >>>>>> than 29 records to read at that point? If the pipeline is only for >>>>>> testing >>>>>> purpose, I would recommend reading from earliest offset to see whether >>>>>> you >>>>>> get records. You can do so by constructing your ReadFromKafka like: >>>>>> ReadFromKafka( >>>>>> consumer_config={'bootstrap.servers': 'localhost:29092', >>>>>> 'auto.offset.reset':'earliest'}, >>>>>> topics=['test']) >>>>>> >>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I'm trying out a simple example of reading data off a Kafka topic >>>>>>> into Apache Beam. Here's the relevant snippet: >>>>>>> >>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>> _ = ( >>>>>>> pipeline >>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>> consumer_config={'bootstrap.servers': 'localhost:29092'}, >>>>>>> topics=['test']) >>>>>>> | 'Print' >> beam.Map(print)) >>>>>>> >>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>> coming in. Kafka is running locally in a docker container, and I'm able >>>>>>> to >>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>> subscribe to messages. So, I guess there are no issues on that front. >>>>>>> >>>>>>> It appears that Beam is able to connect to Kafka and get notified of >>>>>>> new messages, as I see the offset changes in the Beam logs as I publish >>>>>>> data from `kafkacat`: >>>>>>> >>>>>>> INFO:root:severity: INFO >>>>>>> timestamp { >>>>>>> seconds: 1612886861 >>>>>>> nanos: 534000000 >>>>>>> } >>>>>>> message: "[Consumer >>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST >>>>>>> offset >>>>>>> of partition test-0" >>>>>>> log_location: >>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>> thread: "22" >>>>>>> >>>>>>> INFO:root:severity: INFO >>>>>>> timestamp { >>>>>>> seconds: 1612886861 >>>>>>> nanos: 537000000 >>>>>>> } >>>>>>> message: "[Consumer >>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for >>>>>>> partition test-0 to offset 29." >>>>>>> log_location: >>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>> thread: "22" >>>>>>> >>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>> >>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>> 1:foo >>>>>>> 1:bar >>>>>>> >>>>>>> and I can confirm that its being received, again using `kafkacat`: >>>>>>> >>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>>>> Key: 1 Value: foo >>>>>>> Key: 1 Value: bar >>>>>>> >>>>>>> But despite this, I don't see the actual message being printed by >>>>>>> Beam as I expected. Any pointers to what's missing here are appreciated. >>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline side, >>>>>>> but could be incorrect. >>>>>>> >>>>>>> Thanks in advance for any pointers! >>>>>>> >>>>>>> Cheers, >>>>>>> Sumeet >>>>>>> >>>>>>
