Hi Sumeet, Which Beam version are you using for your pipeline?
On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]> wrote: > I don't believe Fn API DirectRunner supports streaming yet (I might be > wrong). I can confirm that this works for Dataflow. > > Thanks, > Cham > > On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < > [email protected]> wrote: > >> Thanks Cham! But I don't think this is Flink specific. I have observed >> similar behaviour with DirectRunner as well BTW. >> >> ..Sumeet >> >> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> I'm not too familiar with Flink but it seems like, for streaming >>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>> steps for some reason. >>> * X-lang Bounded read with Flink seems to be fine. >>> * X-lang Kafka sink and with Flink to be fine. >>> >>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. >>> >>> Thanks, >>> Cham >>> >>> >>> >>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>> [email protected]> wrote: >>> >>>> Hi Cham, >>>> >>>> Do you have pointers on what might be going on? Or something else I can >>>> try? I had posted the same on StackOverflow [1], it seems that I'm not the >>>> only one seeing this issue at the moment. >>>> >>>> Thanks, >>>> Sumeet >>>> >>>> [1] >>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>> >>>> >>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>> [email protected]> wrote: >>>> >>>>> Took me some time to setup the Java test (using Java after more than a >>>>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work >>>>> fine. >>>>> >>>>> Here's the relevant Java code. The only difference from the Python >>>>> version is that I had to extract the KV from the KafkaRecord object and >>>>> construct a PCollection<KV> explicitly before writing to the output topic. >>>>> >>>>> ~~~~~~~~ >>>>> package org.apache.beam.kafka.test; >>>>> >>>>> import org.apache.beam.sdk.Pipeline; >>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>> import org.apache.beam.sdk.options.Default; >>>>> import org.apache.beam.sdk.options.Description; >>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>> import org.apache.beam.sdk.transforms.*; >>>>> import org.apache.beam.sdk.values.KV; >>>>> import org.apache.beam.sdk.values.PCollection; >>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>> >>>>> public class KafkaTest { >>>>> >>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>> Default bootstrap kafka servers >>>>> static final String INPUT_TOPIC = "in_topic"; // Default input kafka >>>>> topic name >>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>>>> kafka topic name >>>>> >>>>> /** Specific pipeline options. */ >>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>> @Description("Kafka bootstrap servers") >>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>> String getBootstrap(); >>>>> >>>>> void setBootstrap(String value); >>>>> >>>>> @Description("Kafka input topic name") >>>>> @Default.String(INPUT_TOPIC) >>>>> String getInputTopic(); >>>>> >>>>> void setInputTopic(String value); >>>>> >>>>> @Description("Kafka output topic name") >>>>> @Default.String(OUTPUT_TOPIC) >>>>> String getOutputTopic(); >>>>> >>>>> void setOutputTopic(String value); >>>>> } >>>>> >>>>> public static final void main(String[] args) throws Exception { >>>>> final KafkaTestOptions options = >>>>> >>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>> >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> pipeline >>>>> .apply( >>>>> "ReadFromKafka", >>>>> KafkaIO.<String, String>read() >>>>> .withBootstrapServers(options.getBootstrap()) >>>>> .withTopic(options.getInputTopic()) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class)) >>>>> .apply( >>>>> "PrepareForWriting", >>>>> ParDo.of( >>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>> String>>() { >>>>> @ProcessElement >>>>> public void processElement(ProcessContext c) throws >>>>> Exception { >>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>> c.element().getKV().getValue())); >>>>> } >>>>> })) >>>>> .apply( >>>>> "WriteToKafka", >>>>> KafkaIO.<String, String>write() >>>>> .withBootstrapServers(options.getBootstrap()) >>>>> .withTopic(options.getOutputTopic()) >>>>> >>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>> >>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>> >>>>> pipeline.run(); >>>>> } >>>>> } >>>>> ~~~~~~~~~ >>>>> >>>>> I'm firing the Java version as follows: >>>>> >>>>> $ mvn exec:java >>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>> -Pflink-runner >>>>> -Dexec.args="--runner=FlinkRunner" >>>>> >>>>> And I can see in real time, that as I publish records to the in_topic, >>>>> the out_topic is able to receive them on a continuous basis. >>>>> >>>>> I hope this helps narrow down the issue. >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>>> >>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO >>>>>> and Flink ? (without x-lang) >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Cham! >>>>>>> >>>>>>> So finally I was able to get partial success. Since I had >>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>> follows: >>>>>>> >>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>> _ = ( >>>>>>> pipeline >>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>> consumer_config={ >>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>> topics=[in_topic], >>>>>>> max_num_records=3) >>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>> producer_config={ >>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>> topic=out_topic)) >>>>>>> >>>>>>> I was able to see all 3 records being read, and written >>>>>>> successfully to the out_topic as well. So, it appears that there might >>>>>>> be >>>>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>>>> setting that I might be missing? >>>>>>> >>>>>>> Thanks, >>>>>>> Sumeet >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hey Cham! >>>>>>>> >>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>> below), but I still don't see any data being consumed or written back >>>>>>>> to >>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>> details/context that might help narrow down the issue. Apologies for >>>>>>>> being >>>>>>>> a bit verbose from hereon! >>>>>>>> >>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>> >>>>>>>> ~~~~~~ >>>>>>>> import apache_beam as beam >>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>> >>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>> save_main_session=True, streaming=True) >>>>>>>> >>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>> in_topic=%s out_topic=%s', >>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>> >>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>> _ = ( >>>>>>>> pipeline >>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>> consumer_config={ >>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>> }, >>>>>>>> topics=[in_topic]) >>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>> producer_config={ >>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>> }, >>>>>>>> topic=out_topic)) >>>>>>>> >>>>>>>> if __name__ == '__main__': >>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>> import argparse >>>>>>>> >>>>>>>> parser = argparse.ArgumentParser() >>>>>>>> parser.add_argument( >>>>>>>> '--bootstrap_servers', >>>>>>>> dest='bootstrap_servers', >>>>>>>> required=True, >>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>> parser.add_argument( >>>>>>>> '--in_topic', >>>>>>>> dest='in_topic', >>>>>>>> required=True, >>>>>>>> help='Kafka topic to read data from') >>>>>>>> parser.add_argument( >>>>>>>> '--out_topic', >>>>>>>> dest='out_topic', >>>>>>>> required=True, >>>>>>>> help='Kafka topic to write data to') >>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>> >>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>> ~~~~~ >>>>>>>> >>>>>>>> I'm firing this pipeline as follows: >>>>>>>> >>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>> >>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>> >>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>> v1 >>>>>>>> v2 >>>>>>>> v3 >>>>>>>> >>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>>>> processing the records. I don't see any data written to out_topic. I >>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>> >>>>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >>>>>>>> in_topic=in_topic out_topic=out_topic >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>> in_topic-0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>> in_topic-0 to offset 0.' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading >>>>>>>> from in_topic-0 starting at offset 0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>> partition(s): in_topic-0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>>>> offset >>>>>>>> of partition in_topic-0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>> >>>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>>> >>>>>>>> Consumer Config: >>>>>>>> >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>>> values:' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>> = true' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>>>> earliest' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>> [localhost:29092]' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>> default' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms >>>>>>>> = 60000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>>>>> false' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>> = true' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>> 52428800' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = >>>>>>>> 500' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms >>>>>>>> = 3000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>>> []' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>> = true' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>> read_uncommitted' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>> = 1048576' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms >>>>>>>> = 300000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>>>> 300000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = >>>>>>>> 2' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>> = INFO' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>>>> 65536' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>> = 50' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>>> 30000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>> 100' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>> = /usr/bin/kinit' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>> = 60000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>> = 0.05' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>> = 0.8' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>> = 300' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>> = 60' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>> = 0.8' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>> = 0.05' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>> GSSAPI' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>> PLAINTEXT' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>> 131072' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>>>>>> 10000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>> = https' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>> = SunX509' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>> JKS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>> = PKIX' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>>> JKS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>> >>>>>>>> Producer Config: >>>>>>>> >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>>> values:' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>> [localhost:29092]' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>> 33554432' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>> default' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = >>>>>>>> none' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >>>>>>>> 120000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>>>>> false' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>>> []' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >>>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>> = 5' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>>> 1048576' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>>>> 300000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = >>>>>>>> 2' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>> = INFO' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>>>> 32768' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>> = 50' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>>> 30000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>> 100' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>> = /usr/bin/kinit' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>> = 60000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>> = 0.05' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>> = 0.8' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>> = 300' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>> = 60' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>> = 0.8' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>> = 0.05' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>> GSSAPI' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>> PLAINTEXT' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>> 131072' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>> = https' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>> = SunX509' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>> JKS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>> = PKIX' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>> = null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>>> JKS' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms >>>>>>>> = 60000' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = >>>>>>>> null' >>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>> >>>>>>>> >>>>>>>> Apologies again for dumping almost everything here :-) Any pointers >>>>>>>> on what might be the issue are appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Sumeet >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>> distributed system like GCS) instead of just printing them ? (given >>>>>>>>> that >>>>>>>>> multi-language pipelines run SDK containers in Docker you might not >>>>>>>>> see >>>>>>>>> prints in the original console I think). >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Sumeet, >>>>>>>>>> >>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is >>>>>>>>>> default setting) as the start offset to read, which is 29. Do you >>>>>>>>>> have more >>>>>>>>>> than 29 records to read at that point? If the pipeline is only for >>>>>>>>>> testing >>>>>>>>>> purpose, I would recommend reading from earliest offset to see >>>>>>>>>> whether you >>>>>>>>>> get records. You can do so by constructing your ReadFromKafka like: >>>>>>>>>> ReadFromKafka( >>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>> topics=['test']) >>>>>>>>>> >>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> >>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>> >>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>> _ = ( >>>>>>>>>>> pipeline >>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>> topics=['test']) >>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>> >>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm >>>>>>>>>>> able to >>>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>> front. >>>>>>>>>>> >>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam >>>>>>>>>>> logs as I >>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>> >>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>> timestamp { >>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>> nanos: 534000000 >>>>>>>>>>> } >>>>>>>>>>> message: "[Consumer >>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST >>>>>>>>>>> offset >>>>>>>>>>> of partition test-0" >>>>>>>>>>> log_location: >>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>> thread: "22" >>>>>>>>>>> >>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>> timestamp { >>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>> nanos: 537000000 >>>>>>>>>>> } >>>>>>>>>>> message: "[Consumer >>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset >>>>>>>>>>> for >>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>> log_location: >>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>> thread: "22" >>>>>>>>>>> >>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>> >>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>> 1:foo >>>>>>>>>>> 1:bar >>>>>>>>>>> >>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>> `kafkacat`: >>>>>>>>>>> >>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>> >>>>>>>>>>> But despite this, I don't see the actual message being printed >>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are >>>>>>>>>>> appreciated. >>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline >>>>>>>>>>> side, >>>>>>>>>>> but could be incorrect. >>>>>>>>>>> >>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Sumeet >>>>>>>>>>> >>>>>>>>>>
