Thanks Cham. In the python version, I do specify the streaming option as follows (not on the command line though):
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True) Regarding running portable pipelines, just to confirm, what you are saying is that currently the only way to execute this is in Java then until the issue you created is resolved? Thanks, Sumeet On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote: > Hi Sumeet, > > After double checking the current support status. the root cause is that > when you are using cross-language pipelines, you are actually having > pipelines running in the portable way[1]. Currently we haven't supported > processing unbounded source on Flink over portable execution well. I have > filed https://issues.apache.org/jira/browse/BEAM-11998 to track the > progress. > > [1] https://s.apache.org/beam-fn-api > > > On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> wrote: > >> And one more question, did you launch your pipeline with streaming=True >> pipeline >> options? I think you need to use --streaming=True to have unbounded >> source working properly. >> >> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote: >> >>> Hi Sumeet, >>> >>> Which Beam version are you using for your pipeline? >>> >>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath < >>> [email protected]> wrote: >>> >>>> I don't believe Fn API DirectRunner supports streaming yet (I might be >>>> wrong). I can confirm that this works for Dataflow. >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >>>> [email protected]> wrote: >>>> >>>>> Thanks Cham! But I don't think this is Flink specific. I have observed >>>>> similar behaviour with DirectRunner as well BTW. >>>>> >>>>> ..Sumeet >>>>> >>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> I'm not too familiar with Flink but it seems like, for streaming >>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>>>>> steps for some reason. >>>>>> * X-lang Bounded read with Flink seems to be fine. >>>>>> * X-lang Kafka sink and with Flink to be fine. >>>>>> >>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for >>>>>> tracking. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Cham, >>>>>>> >>>>>>> Do you have pointers on what might be going on? Or something else I >>>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm >>>>>>> not >>>>>>> the only one seeing this issue at the moment. >>>>>>> >>>>>>> Thanks, >>>>>>> Sumeet >>>>>>> >>>>>>> [1] >>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Took me some time to setup the Java test (using Java after more >>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink >>>>>>>> seems to >>>>>>>> work fine. >>>>>>>> >>>>>>>> Here's the relevant Java code. The only difference from the Python >>>>>>>> version is that I had to extract the KV from the KafkaRecord object and >>>>>>>> construct a PCollection<KV> explicitly before writing to the output >>>>>>>> topic. >>>>>>>> >>>>>>>> ~~~~~~~~ >>>>>>>> package org.apache.beam.kafka.test; >>>>>>>> >>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>>>> import org.apache.beam.sdk.options.Default; >>>>>>>> import org.apache.beam.sdk.options.Description; >>>>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>> import org.apache.beam.sdk.transforms.*; >>>>>>>> import org.apache.beam.sdk.values.KV; >>>>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>>>> >>>>>>>> public class KafkaTest { >>>>>>>> >>>>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>>>> Default bootstrap kafka servers >>>>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>>>> kafka topic name >>>>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>>>>>>> kafka topic name >>>>>>>> >>>>>>>> /** Specific pipeline options. */ >>>>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>>>> @Description("Kafka bootstrap servers") >>>>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>>>> String getBootstrap(); >>>>>>>> >>>>>>>> void setBootstrap(String value); >>>>>>>> >>>>>>>> @Description("Kafka input topic name") >>>>>>>> @Default.String(INPUT_TOPIC) >>>>>>>> String getInputTopic(); >>>>>>>> >>>>>>>> void setInputTopic(String value); >>>>>>>> >>>>>>>> @Description("Kafka output topic name") >>>>>>>> @Default.String(OUTPUT_TOPIC) >>>>>>>> String getOutputTopic(); >>>>>>>> >>>>>>>> void setOutputTopic(String value); >>>>>>>> } >>>>>>>> >>>>>>>> public static final void main(String[] args) throws Exception { >>>>>>>> final KafkaTestOptions options = >>>>>>>> >>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>>>> >>>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>>> pipeline >>>>>>>> .apply( >>>>>>>> "ReadFromKafka", >>>>>>>> KafkaIO.<String, String>read() >>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>> .withTopic(options.getInputTopic()) >>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>> .apply( >>>>>>>> "PrepareForWriting", >>>>>>>> ParDo.of( >>>>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>>>> String>>() { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext c) >>>>>>>> throws Exception { >>>>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>>>> c.element().getKV().getValue())); >>>>>>>> } >>>>>>>> })) >>>>>>>> .apply( >>>>>>>> "WriteToKafka", >>>>>>>> KafkaIO.<String, String>write() >>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>> .withTopic(options.getOutputTopic()) >>>>>>>> >>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>>>> >>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>>>> >>>>>>>> pipeline.run(); >>>>>>>> } >>>>>>>> } >>>>>>>> ~~~~~~~~~ >>>>>>>> >>>>>>>> I'm firing the Java version as follows: >>>>>>>> >>>>>>>> $ mvn exec:java >>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>>>> -Pflink-runner >>>>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>>>> >>>>>>>> And I can see in real time, that as I publish records to the >>>>>>>> in_topic, the out_topic is able to receive them on a continuous basis. >>>>>>>> >>>>>>>> I hope this helps narrow down the issue. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Sumeet >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Are you able to run a similar Java streaming pipeline using >>>>>>>>> KafkaIO and Flink ? (without x-lang) >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Cham! >>>>>>>>>> >>>>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>>>> follows: >>>>>>>>>> >>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>> _ = ( >>>>>>>>>> pipeline >>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>> consumer_config={ >>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>>>> topics=[in_topic], >>>>>>>>>> max_num_records=3) >>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>> producer_config={ >>>>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>>>> topic=out_topic)) >>>>>>>>>> >>>>>>>>>> I was able to see all 3 records being read, and written >>>>>>>>>> successfully to the out_topic as well. So, it appears that there >>>>>>>>>> might be >>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>>>>>>> setting that I might be missing? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Sumeet >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Cham! >>>>>>>>>>> >>>>>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>>>>> below), but I still don't see any data being consumed or written >>>>>>>>>>> back to >>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>>>>> details/context that might help narrow down the issue. Apologies >>>>>>>>>>> for being >>>>>>>>>>> a bit verbose from hereon! >>>>>>>>>>> >>>>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>>>> >>>>>>>>>>> ~~~~~~ >>>>>>>>>>> import apache_beam as beam >>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>>>> >>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>>>> >>>>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>>>> >>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>> _ = ( >>>>>>>>>>> pipeline >>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>> consumer_config={ >>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>>>> }, >>>>>>>>>>> topics=[in_topic]) >>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>> producer_config={ >>>>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>>>> }, >>>>>>>>>>> topic=out_topic)) >>>>>>>>>>> >>>>>>>>>>> if __name__ == '__main__': >>>>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>>>> import argparse >>>>>>>>>>> >>>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>>> parser.add_argument( >>>>>>>>>>> '--bootstrap_servers', >>>>>>>>>>> dest='bootstrap_servers', >>>>>>>>>>> required=True, >>>>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>>>> parser.add_argument( >>>>>>>>>>> '--in_topic', >>>>>>>>>>> dest='in_topic', >>>>>>>>>>> required=True, >>>>>>>>>>> help='Kafka topic to read data from') >>>>>>>>>>> parser.add_argument( >>>>>>>>>>> '--out_topic', >>>>>>>>>>> dest='out_topic', >>>>>>>>>>> required=True, >>>>>>>>>>> help='Kafka topic to write data to') >>>>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>>>> >>>>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>>>> ~~~~~ >>>>>>>>>>> >>>>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>>>> >>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>>>> >>>>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>>>> >>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>>>> v1 >>>>>>>>>>> v2 >>>>>>>>>>> v3 >>>>>>>>>>> >>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>>>>>>> processing the records. I don't see any data written to out_topic. I >>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>>>> >>>>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>>>> out_topic=out_topic >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>>>> in_topic-0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: >>>>>>>>>>> reading from in_topic-0 starting at offset 0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>>>> partition(s): in_topic-0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>>>>>>> offset >>>>>>>>>>> of partition in_topic-0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset >>>>>>>>>>> for >>>>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>>>> >>>>>>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>>>>>> >>>>>>>>>>> Consumer Config: >>>>>>>>>>> >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>>>>>> values:' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>>>> = true' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>>>>>>> earliest' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>>>> [localhost:29092]' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>>>> default' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit >>>>>>>>>>> = false' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>>>> = true' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>>>>> 52428800' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms >>>>>>>>>>> = 500' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> theartbeat.interval.ms = 3000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>> = []' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>>>> = true' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>>>>> read_uncommitted' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>>>> = 1048576' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tmax.poll.interval.ms = 300000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = >>>>>>>>>>> 500' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>>>> = 300000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>>> []' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>> = 2' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>> = INFO' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>> = 65536' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>>> = 30000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>>>> 100' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>> = 60000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>> = 0.05' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>> = 0.8' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>> = 300' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>> = 60' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>> = 0.8' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>> = 0.05' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>> GSSAPI' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>>>> PLAINTEXT' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>>>> 131072' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms >>>>>>>>>>> = 10000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>> = https' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>> = SunX509' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>>>> JKS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>> = PKIX' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>> = JKS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer >>>>>>>>>>> = class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>> >>>>>>>>>>> Producer Config: >>>>>>>>>>> >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>>>>>> values:' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>>>> [localhost:29092]' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>>>> 33554432' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>>>> default' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = >>>>>>>>>>> none' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms >>>>>>>>>>> = 120000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence >>>>>>>>>>> = false' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>> = []' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = >>>>>>>>>>> 60000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>>>> = 5' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>>>>>> 1048576' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>>>> = 300000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>>> []' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>> = 2' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>> = INFO' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>>>>>>> class >>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>> = 32768' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>>> = 30000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>>>> 100' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>> = 60000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>> = 0.05' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>> = 0.8' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>> = 300' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>> = 60' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>> = 0.8' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>> = 0.05' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>> GSSAPI' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>>>> PLAINTEXT' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>>>> 131072' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>> = https' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>> = SunX509' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>>>> JKS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>> = PKIX' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>> = null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>> = JKS' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = >>>>>>>>>>> null' >>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Sumeet >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>>>> distributed system like GCS) instead of just printing them ? >>>>>>>>>>>> (given that >>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might >>>>>>>>>>>> not see >>>>>>>>>>>> prints in the original console I think). >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Cham >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Sumeet, >>>>>>>>>>>>> >>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which >>>>>>>>>>>>> is default setting) as the start offset to read, which is 29. Do >>>>>>>>>>>>> you have >>>>>>>>>>>>> more than 29 records to read at that point? If the pipeline is >>>>>>>>>>>>> only for >>>>>>>>>>>>> testing purpose, I would recommend reading from earliest offset >>>>>>>>>>>>> to see >>>>>>>>>>>>> whether you get records. You can do so by constructing your >>>>>>>>>>>>> ReadFromKafka >>>>>>>>>>>>> like: >>>>>>>>>>>>> ReadFromKafka( >>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>>>> >>>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>>> _ = ( >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any >>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker >>>>>>>>>>>>>> container, and I'm >>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) to >>>>>>>>>>>>>> publish and >>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>>>>> front. >>>>>>>>>>>>>> >>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the >>>>>>>>>>>>>> Beam logs as I >>>>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>>>> } >>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>>> of partition test-0" >>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>> >>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>>>> } >>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting >>>>>>>>>>>>>> offset for >>>>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>>>> 1:foo >>>>>>>>>>>>>> 1:bar >>>>>>>>>>>>>> >>>>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>>>> >>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: >>>>>>>>>>>>>> %s\n' >>>>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>>>> >>>>>>>>>>>>>> But despite this, I don't see the actual message being >>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing >>>>>>>>>>>>>> here are >>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on >>>>>>>>>>>>>> the Beam >>>>>>>>>>>>>> pipeline side, but could be incorrect. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Sumeet >>>>>>>>>>>>>> >>>>>>>>>>>>>
