I don't believe Fn API DirectRunner supports streaming yet (I might be wrong). I can confirm that this works for Dataflow.
Thanks, Cham On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <[email protected]> wrote: > Thanks Cham! But I don't think this is Flink specific. I have observed > similar behaviour with DirectRunner as well BTW. > > ..Sumeet > > On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <[email protected]> > wrote: > >> I'm not too familiar with Flink but it seems like, for streaming >> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >> steps for some reason. >> * X-lang Bounded read with Flink seems to be fine. >> * X-lang Kafka sink and with Flink to be fine. >> >> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. >> >> Thanks, >> Cham >> >> >> >> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >> [email protected]> wrote: >> >>> Hi Cham, >>> >>> Do you have pointers on what might be going on? Or something else I can >>> try? I had posted the same on StackOverflow [1], it seems that I'm not the >>> only one seeing this issue at the moment. >>> >>> Thanks, >>> Sumeet >>> >>> [1] >>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>> >>> >>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>> [email protected]> wrote: >>> >>>> Took me some time to setup the Java test (using Java after more than a >>>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work >>>> fine. >>>> >>>> Here's the relevant Java code. The only difference from the Python >>>> version is that I had to extract the KV from the KafkaRecord object and >>>> construct a PCollection<KV> explicitly before writing to the output topic. >>>> >>>> ~~~~~~~~ >>>> package org.apache.beam.kafka.test; >>>> >>>> import org.apache.beam.sdk.Pipeline; >>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>> import org.apache.beam.sdk.options.Default; >>>> import org.apache.beam.sdk.options.Description; >>>> import org.apache.beam.sdk.options.PipelineOptions; >>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>> import org.apache.beam.sdk.transforms.*; >>>> import org.apache.beam.sdk.values.KV; >>>> import org.apache.beam.sdk.values.PCollection; >>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>> >>>> public class KafkaTest { >>>> >>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default >>>> bootstrap kafka servers >>>> static final String INPUT_TOPIC = "in_topic"; // Default input kafka >>>> topic name >>>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>>> kafka topic name >>>> >>>> /** Specific pipeline options. */ >>>> public interface KafkaTestOptions extends PipelineOptions { >>>> @Description("Kafka bootstrap servers") >>>> @Default.String(BOOTSTRAP_SERVERS) >>>> String getBootstrap(); >>>> >>>> void setBootstrap(String value); >>>> >>>> @Description("Kafka input topic name") >>>> @Default.String(INPUT_TOPIC) >>>> String getInputTopic(); >>>> >>>> void setInputTopic(String value); >>>> >>>> @Description("Kafka output topic name") >>>> @Default.String(OUTPUT_TOPIC) >>>> String getOutputTopic(); >>>> >>>> void setOutputTopic(String value); >>>> } >>>> >>>> public static final void main(String[] args) throws Exception { >>>> final KafkaTestOptions options = >>>> >>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>> >>>> Pipeline pipeline = Pipeline.create(options); >>>> pipeline >>>> .apply( >>>> "ReadFromKafka", >>>> KafkaIO.<String, String>read() >>>> .withBootstrapServers(options.getBootstrap()) >>>> .withTopic(options.getInputTopic()) >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(StringDeserializer.class)) >>>> .apply( >>>> "PrepareForWriting", >>>> ParDo.of( >>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>> String>>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws >>>> Exception { >>>> c.output(KV.of(c.element().getKV().getKey(), >>>> c.element().getKV().getValue())); >>>> } >>>> })) >>>> .apply( >>>> "WriteToKafka", >>>> KafkaIO.<String, String>write() >>>> .withBootstrapServers(options.getBootstrap()) >>>> .withTopic(options.getOutputTopic()) >>>> >>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>> >>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>> >>>> pipeline.run(); >>>> } >>>> } >>>> ~~~~~~~~~ >>>> >>>> I'm firing the Java version as follows: >>>> >>>> $ mvn exec:java >>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner >>>> -Dexec.args="--runner=FlinkRunner" >>>> >>>> And I can see in real time, that as I publish records to the in_topic, >>>> the out_topic is able to receive them on a continuous basis. >>>> >>>> I hope this helps narrow down the issue. >>>> >>>> Thanks, >>>> Sumeet >>>> >>>> >>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> Are you able to run a similar Java streaming pipeline using KafkaIO >>>>> and Flink ? (without x-lang) >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Cham! >>>>>> >>>>>> So finally I was able to get partial success. Since I had >>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>> max_num_records=3 to see if it can read all existing records, as follows: >>>>>> >>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>> _ = ( >>>>>> pipeline >>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>> consumer_config={ >>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>> 'auto.offset.reset': 'earliest'}, >>>>>> topics=[in_topic], >>>>>> max_num_records=3) >>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>> producer_config={ >>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>> topic=out_topic)) >>>>>> >>>>>> I was able to see all 3 records being read, and written >>>>>> successfully to the out_topic as well. So, it appears that there might be >>>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>>> setting that I might be missing? >>>>>> >>>>>> Thanks, >>>>>> Sumeet >>>>>> >>>>>> >>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hey Cham! >>>>>>> >>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>> below), but I still don't see any data being consumed or written back to >>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>> details/context that might help narrow down the issue. Apologies for >>>>>>> being >>>>>>> a bit verbose from hereon! >>>>>>> >>>>>>> First, here's what my pipeline code looks like now: >>>>>>> >>>>>>> ~~~~~~ >>>>>>> import apache_beam as beam >>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>> >>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>> save_main_session=True, streaming=True) >>>>>>> >>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>> in_topic=%s out_topic=%s', >>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>> >>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>> _ = ( >>>>>>> pipeline >>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>> consumer_config={ >>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>> 'auto.offset.reset': 'earliest' >>>>>>> }, >>>>>>> topics=[in_topic]) >>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>> producer_config={ >>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>> }, >>>>>>> topic=out_topic)) >>>>>>> >>>>>>> if __name__ == '__main__': >>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>> import argparse >>>>>>> >>>>>>> parser = argparse.ArgumentParser() >>>>>>> parser.add_argument( >>>>>>> '--bootstrap_servers', >>>>>>> dest='bootstrap_servers', >>>>>>> required=True, >>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>> parser.add_argument( >>>>>>> '--in_topic', >>>>>>> dest='in_topic', >>>>>>> required=True, >>>>>>> help='Kafka topic to read data from') >>>>>>> parser.add_argument( >>>>>>> '--out_topic', >>>>>>> dest='out_topic', >>>>>>> required=True, >>>>>>> help='Kafka topic to write data to') >>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>> >>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>> known_args.out_topic, pipeline_args) >>>>>>> ~~~~~ >>>>>>> >>>>>>> I'm firing this pipeline as follows: >>>>>>> >>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>> >>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>> >>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>> v1 >>>>>>> v2 >>>>>>> v3 >>>>>>> >>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>>> processing the records. I don't see any data written to out_topic. I >>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>> >>>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >>>>>>> in_topic=in_topic out_topic=out_topic >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned >>>>>>> to split 0 (total 1): in_topic-0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>> in_topic-0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>> in_topic-0 to offset 0.' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading >>>>>>> from in_topic-0 starting at offset 0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>> partition(s): in_topic-0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>>> offset >>>>>>> of partition in_topic-0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>>>>> partition in_topic-0 to offset 3.' >>>>>>> >>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>> >>>>>>> Consumer Config: >>>>>>> >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>> values:' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>> = true' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms >>>>>>> = 5000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>>> earliest' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>> [localhost:29092]' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>> default' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>>>> = 540000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms >>>>>>> = 60000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>>>> false' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>> = true' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>> 52428800' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = >>>>>>> 500' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms >>>>>>> = 3000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>> []' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>> = true' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>> read_uncommitted' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>> = 1048576' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = >>>>>>> 300000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>>> 300000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>> = INFO' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>>> 65536' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>>>> 50' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>> 30000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>> = /usr/bin/kinit' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>> = 60000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> tsasl.kerberos.service.name = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>> = 0.05' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>> = 0.8' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>> = 300' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>> = 60' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>> = 0.8' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>> = 0.05' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>> PLAINTEXT' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>> 131072' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>>>>> 10000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>> = https' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>> = SunX509' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>> = PKIX' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>> JKS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>> >>>>>>> Producer Config: >>>>>>> >>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>> values:' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>> [localhost:29092]' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>> 33554432' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>> default' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>>>> = 540000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >>>>>>> 120000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>>>> false' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>> []' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>> = 5' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>> 1048576' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>>> 300000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>> = INFO' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>>> 32768' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>>>> 50' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>> 30000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>> = /usr/bin/kinit' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>> = 60000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>> tsasl.kerberos.service.name = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>> = 0.05' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>> = 0.8' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>> = 300' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>> = 60' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>> = 0.8' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>> = 0.05' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>> PLAINTEXT' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>> 131072' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>> = https' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>> = SunX509' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>> = PKIX' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>> = null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>> JKS' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms >>>>>>> = 60000' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = >>>>>>> null' >>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>> >>>>>>> >>>>>>> Apologies again for dumping almost everything here :-) Any pointers >>>>>>> on what might be the issue are appreciated. >>>>>>> >>>>>>> Thanks, >>>>>>> Sumeet >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>> distributed system like GCS) instead of just printing them ? (given >>>>>>>> that >>>>>>>> multi-language pipelines run SDK containers in Docker you might not >>>>>>>> see >>>>>>>> prints in the original console I think). >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Sumeet, >>>>>>>>> >>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is >>>>>>>>> default setting) as the start offset to read, which is 29. Do you >>>>>>>>> have more >>>>>>>>> than 29 records to read at that point? If the pipeline is only for >>>>>>>>> testing >>>>>>>>> purpose, I would recommend reading from earliest offset to see >>>>>>>>> whether you >>>>>>>>> get records. You can do so by constructing your ReadFromKafka like: >>>>>>>>> ReadFromKafka( >>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>> topics=['test']) >>>>>>>>> >>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I'm trying out a simple example of reading data off a Kafka topic >>>>>>>>>> into Apache Beam. Here's the relevant snippet: >>>>>>>>>> >>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>> _ = ( >>>>>>>>>> pipeline >>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>> 'localhost:29092'}, >>>>>>>>>> topics=['test']) >>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>> >>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm >>>>>>>>>> able to >>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>>>>> subscribe to messages. So, I guess there are no issues on that front. >>>>>>>>>> >>>>>>>>>> It appears that Beam is able to connect to Kafka and get notified >>>>>>>>>> of new messages, as I see the offset changes in the Beam logs as I >>>>>>>>>> publish >>>>>>>>>> data from `kafkacat`: >>>>>>>>>> >>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>> timestamp { >>>>>>>>>> seconds: 1612886861 >>>>>>>>>> nanos: 534000000 >>>>>>>>>> } >>>>>>>>>> message: "[Consumer >>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST >>>>>>>>>> offset >>>>>>>>>> of partition test-0" >>>>>>>>>> log_location: >>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>> thread: "22" >>>>>>>>>> >>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>> timestamp { >>>>>>>>>> seconds: 1612886861 >>>>>>>>>> nanos: 537000000 >>>>>>>>>> } >>>>>>>>>> message: "[Consumer >>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset >>>>>>>>>> for >>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>> log_location: >>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>> thread: "22" >>>>>>>>>> >>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>> >>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>> 1:foo >>>>>>>>>> 1:bar >>>>>>>>>> >>>>>>>>>> and I can confirm that its being received, again using `kafkacat`: >>>>>>>>>> >>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>>>>>>> Key: 1 Value: foo >>>>>>>>>> Key: 1 Value: bar >>>>>>>>>> >>>>>>>>>> But despite this, I don't see the actual message being printed by >>>>>>>>>> Beam as I expected. Any pointers to what's missing here are >>>>>>>>>> appreciated. >>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline >>>>>>>>>> side, >>>>>>>>>> but could be incorrect. >>>>>>>>>> >>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Sumeet >>>>>>>>>> >>>>>>>>>
