I'm not too familiar with Flink but it seems like, for streaming pipelines, messages from Kafka/SDF read do not get pushed to subsequent steps for some reason. * X-lang Bounded read with Flink seems to be fine. * X-lang Kafka sink and with Flink to be fine.
Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. Thanks, Cham On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <[email protected]> wrote: > Hi Cham, > > Do you have pointers on what might be going on? Or something else I can > try? I had posted the same on StackOverflow [1], it seems that I'm not the > only one seeing this issue at the moment. > > Thanks, > Sumeet > > [1] > https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data > > > On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < > [email protected]> wrote: > >> Took me some time to setup the Java test (using Java after more than a >> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work >> fine. >> >> Here's the relevant Java code. The only difference from the Python >> version is that I had to extract the KV from the KafkaRecord object and >> construct a PCollection<KV> explicitly before writing to the output topic. >> >> ~~~~~~~~ >> package org.apache.beam.kafka.test; >> >> import org.apache.beam.sdk.Pipeline; >> import org.apache.beam.sdk.io.kafka.KafkaIO; >> import org.apache.beam.sdk.io.kafka.KafkaRecord; >> import org.apache.beam.sdk.options.Default; >> import org.apache.beam.sdk.options.Description; >> import org.apache.beam.sdk.options.PipelineOptions; >> import org.apache.beam.sdk.options.PipelineOptionsFactory; >> import org.apache.beam.sdk.transforms.*; >> import org.apache.beam.sdk.values.KV; >> import org.apache.beam.sdk.values.PCollection; >> import org.apache.kafka.common.serialization.StringDeserializer; >> >> public class KafkaTest { >> >> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default >> bootstrap kafka servers >> static final String INPUT_TOPIC = "in_topic"; // Default input kafka >> topic name >> static final String OUTPUT_TOPIC = "out_topic"; // Default output kafka >> topic name >> >> /** Specific pipeline options. */ >> public interface KafkaTestOptions extends PipelineOptions { >> @Description("Kafka bootstrap servers") >> @Default.String(BOOTSTRAP_SERVERS) >> String getBootstrap(); >> >> void setBootstrap(String value); >> >> @Description("Kafka input topic name") >> @Default.String(INPUT_TOPIC) >> String getInputTopic(); >> >> void setInputTopic(String value); >> >> @Description("Kafka output topic name") >> @Default.String(OUTPUT_TOPIC) >> String getOutputTopic(); >> >> void setOutputTopic(String value); >> } >> >> public static final void main(String[] args) throws Exception { >> final KafkaTestOptions options = >> >> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >> >> Pipeline pipeline = Pipeline.create(options); >> pipeline >> .apply( >> "ReadFromKafka", >> KafkaIO.<String, String>read() >> .withBootstrapServers(options.getBootstrap()) >> .withTopic(options.getInputTopic()) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .apply( >> "PrepareForWriting", >> ParDo.of( >> new DoFn<KafkaRecord<String, String>, KV<String, >> String>>() { >> @ProcessElement >> public void processElement(ProcessContext c) throws >> Exception { >> c.output(KV.of(c.element().getKV().getKey(), >> c.element().getKV().getValue())); >> } >> })) >> .apply( >> "WriteToKafka", >> KafkaIO.<String, String>write() >> .withBootstrapServers(options.getBootstrap()) >> .withTopic(options.getOutputTopic()) >> >> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >> >> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >> >> pipeline.run(); >> } >> } >> ~~~~~~~~~ >> >> I'm firing the Java version as follows: >> >> $ mvn exec:java >> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner >> -Dexec.args="--runner=FlinkRunner" >> >> And I can see in real time, that as I publish records to the in_topic, >> the out_topic is able to receive them on a continuous basis. >> >> I hope this helps narrow down the issue. >> >> Thanks, >> Sumeet >> >> >> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Are you able to run a similar Java streaming pipeline using KafkaIO and >>> Flink ? (without x-lang) >>> >>> Thanks, >>> Cham >>> >>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>> [email protected]> wrote: >>> >>>> Hi Cham! >>>> >>>> So finally I was able to get partial success. Since I had pre-populated >>>> the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see >>>> if it can read all existing records, as follows: >>>> >>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>> _ = ( >>>> pipeline >>>> | 'Read from kafka' >> ReadFromKafka( >>>> consumer_config={ >>>> 'bootstrap.servers': bootstrap_servers, >>>> 'auto.offset.reset': 'earliest'}, >>>> topics=[in_topic], >>>> max_num_records=3) >>>> | 'Write to kafka' >> WriteToKafka( >>>> producer_config={ >>>> 'bootstrap.servers': bootstrap_servers}, >>>> topic=out_topic)) >>>> >>>> I was able to see all 3 records being read, and written successfully to >>>> the out_topic as well. So, it appears that there might be some issue with >>>> reading unbounded Kafka streams here? Or is there any setting that I might >>>> be missing? >>>> >>>> Thanks, >>>> Sumeet >>>> >>>> >>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>> [email protected]> wrote: >>>> >>>>> Hey Cham! >>>>> >>>>> Appreciate the response. I tried out your suggestions (details below), >>>>> but I still don't see any data being consumed or written back to Kafka (as >>>>> per your suggestion). I'm also providing additional details/context that >>>>> might help narrow down the issue. Apologies for being a bit verbose from >>>>> hereon! >>>>> >>>>> First, here's what my pipeline code looks like now: >>>>> >>>>> ~~~~~~ >>>>> import apache_beam as beam >>>>> from apache_beam.io.kafka import ReadFromKafka >>>>> from apache_beam.io.kafka import WriteToKafka >>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>> >>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>> save_main_session=True, streaming=True) >>>>> >>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>> in_topic=%s out_topic=%s', >>>>> str(bootstrap_servers), in_topic, out_topic) >>>>> >>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>> _ = ( >>>>> pipeline >>>>> | 'Read from kafka' >> ReadFromKafka( >>>>> consumer_config={ >>>>> 'bootstrap.servers': bootstrap_servers, >>>>> 'auto.offset.reset': 'earliest' >>>>> }, >>>>> topics=[in_topic]) >>>>> | 'Write to kafka' >> WriteToKafka( >>>>> producer_config={ >>>>> 'bootstrap.servers': bootstrap_servers >>>>> }, >>>>> topic=out_topic)) >>>>> >>>>> if __name__ == '__main__': >>>>> logging.getLogger().setLevel(logging.INFO) >>>>> import argparse >>>>> >>>>> parser = argparse.ArgumentParser() >>>>> parser.add_argument( >>>>> '--bootstrap_servers', >>>>> dest='bootstrap_servers', >>>>> required=True, >>>>> help='Bootstrap servers for the Kafka cluster') >>>>> parser.add_argument( >>>>> '--in_topic', >>>>> dest='in_topic', >>>>> required=True, >>>>> help='Kafka topic to read data from') >>>>> parser.add_argument( >>>>> '--out_topic', >>>>> dest='out_topic', >>>>> required=True, >>>>> help='Kafka topic to write data to') >>>>> known_args, pipeline_args = parser.parse_known_args() >>>>> >>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>> known_args.out_topic, pipeline_args) >>>>> ~~~~~ >>>>> >>>>> I'm firing this pipeline as follows: >>>>> >>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>> >>>>> I have pre-populated the Kafka topic with 3 records: >>>>> >>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>> v1 >>>>> v2 >>>>> v3 >>>>> >>>>> Now, when I execute the pipeline, I see that it starts to read records >>>>> from offset 0, but then seeks to the latest offset 3 without processing >>>>> the >>>>> records. I don't see any data written to out_topic. I filtered out the >>>>> logs >>>>> a bit, and this is what I'm seeing: >>>>> >>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >>>>> in_topic=in_topic out_topic=out_topic >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned >>>>> to split 0 (total 1): in_topic-0' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>> in_topic-0 to offset 0.' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading >>>>> from in_topic-0 starting at offset 0' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>> partition(s): in_topic-0' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset >>>>> of partition in_topic-0' >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>>> partition in_topic-0 to offset 3.' >>>>> >>>>> Additionally, the logs also emit complete consumer and producer >>>>> configs. I'm dumping them here, in case that helps: >>>>> >>>>> Consumer Config: >>>>> >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>> values:' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>> = true' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms >>>>> = 5000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>> earliest' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>> [localhost:29092]' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>> default' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>> = 540000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms = >>>>> 60000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>> false' >>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics = >>>>> true' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>> 52428800' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>> Reader-0_offset_consumer_1947524890_none' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms = >>>>> 3000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>> = true' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>> read_uncommitted' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class >>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>> = 1048576' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = >>>>> 300000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>> 300000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >>>>> INFO' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>>> = 30000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>> 65536' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>>> = 1000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>> 50' >>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>> 30000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >>>>> /usr/bin/kinit' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>> = 60000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>> tsasl.kerberos.service.name = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>> = 0.05' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>> = 0.8' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>> = 300' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>> = 60' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>> = 0.8' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>> = 0.05' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>> PLAINTEXT' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>> 131072' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>>> 10000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>> = https' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>> = SunX509' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>> = PKIX' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>> >>>>> Producer Config: >>>>> >>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>> values:' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>> [localhost:29092]' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>> default' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>> = 540000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >>>>> 120000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>> false' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>> = 5' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>> 1048576' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>> 300000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >>>>> INFO' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>>> = 30000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class >>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>> 32768' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>>> = 1000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>> 50' >>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>> 30000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >>>>> /usr/bin/kinit' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>> = 60000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>> tsasl.kerberos.service.name = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>> = 0.05' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>> = 0.8' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>> = 300' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>> = 60' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>> = 0.8' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>> = 0.05' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>> PLAINTEXT' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>> 131072' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>> = https' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>> = SunX509' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>> = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>> = PKIX' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >>>>> null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms = >>>>> 60000' >>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null' >>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class >>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>> >>>>> >>>>> Apologies again for dumping almost everything here :-) Any pointers on >>>>> what might be the issue are appreciated. >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>>> >>>>> >>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> Also can you try sending messages back to Kafka (or another >>>>>> distributed system like GCS) instead of just printing them ? (given that >>>>>> multi-language pipelines run SDK containers in Docker you might not see >>>>>> prints in the original console I think). >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Sumeet, >>>>>>> >>>>>>> It seems like your kafka consumer uses the LATEST offset(which is >>>>>>> default setting) as the start offset to read, which is 29. Do you have >>>>>>> more >>>>>>> than 29 records to read at that point? If the pipeline is only for >>>>>>> testing >>>>>>> purpose, I would recommend reading from earliest offset to see whether >>>>>>> you >>>>>>> get records. You can do so by constructing your ReadFromKafka like: >>>>>>> ReadFromKafka( >>>>>>> consumer_config={'bootstrap.servers': 'localhost:29092', >>>>>>> 'auto.offset.reset':'earliest'}, >>>>>>> topics=['test']) >>>>>>> >>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I'm trying out a simple example of reading data off a Kafka topic >>>>>>>> into Apache Beam. Here's the relevant snippet: >>>>>>>> >>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>> _ = ( >>>>>>>> pipeline >>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>> 'localhost:29092'}, >>>>>>>> topics=['test']) >>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>> >>>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>>> coming in. Kafka is running locally in a docker container, and I'm >>>>>>>> able to >>>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>>> subscribe to messages. So, I guess there are no issues on that front. >>>>>>>> >>>>>>>> It appears that Beam is able to connect to Kafka and get notified >>>>>>>> of new messages, as I see the offset changes in the Beam logs as I >>>>>>>> publish >>>>>>>> data from `kafkacat`: >>>>>>>> >>>>>>>> INFO:root:severity: INFO >>>>>>>> timestamp { >>>>>>>> seconds: 1612886861 >>>>>>>> nanos: 534000000 >>>>>>>> } >>>>>>>> message: "[Consumer >>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST >>>>>>>> offset >>>>>>>> of partition test-0" >>>>>>>> log_location: >>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>> thread: "22" >>>>>>>> >>>>>>>> INFO:root:severity: INFO >>>>>>>> timestamp { >>>>>>>> seconds: 1612886861 >>>>>>>> nanos: 537000000 >>>>>>>> } >>>>>>>> message: "[Consumer >>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for >>>>>>>> partition test-0 to offset 29." >>>>>>>> log_location: >>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>> thread: "22" >>>>>>>> >>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>> >>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>> 1:foo >>>>>>>> 1:bar >>>>>>>> >>>>>>>> and I can confirm that its being received, again using `kafkacat`: >>>>>>>> >>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>>>>> Key: 1 Value: foo >>>>>>>> Key: 1 Value: bar >>>>>>>> >>>>>>>> But despite this, I don't see the actual message being printed by >>>>>>>> Beam as I expected. Any pointers to what's missing here are >>>>>>>> appreciated. >>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline >>>>>>>> side, >>>>>>>> but could be incorrect. >>>>>>>> >>>>>>>> Thanks in advance for any pointers! >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Sumeet >>>>>>>> >>>>>>>
