IIUC, currently Splittable DoFn (source framework) does not work for portable runners in streaming mode due to the issue Boyuan mentioned.
On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra <[email protected]> wrote: > Thanks Cham. In the python version, I do specify the streaming option as > follows (not on the command line though): > > pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, > streaming=True) > > Regarding running portable pipelines, just to confirm, what you are saying > is that currently the only way to execute this is in Java then until the > issue you created is resolved? > Yes, I think Java worked since it did not use portable Spark/Flink but a cross-language transform would require this. Thanks, Cham > > Thanks, > Sumeet > > > On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote: > >> Hi Sumeet, >> >> After double checking the current support status. the root cause is that >> when you are using cross-language pipelines, you are actually having >> pipelines running in the portable way[1]. Currently we haven't supported >> processing unbounded source on Flink over portable execution well. I have >> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the >> progress. >> >> [1] https://s.apache.org/beam-fn-api >> >> >> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> wrote: >> >>> And one more question, did you launch your pipeline with streaming=True >>> pipeline >>> options? I think you need to use --streaming=True to have unbounded >>> source working properly. >>> >>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote: >>> >>>> Hi Sumeet, >>>> >>>> Which Beam version are you using for your pipeline? >>>> >>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> I don't believe Fn API DirectRunner supports streaming yet (I might be >>>>> wrong). I can confirm that this works for Dataflow. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >>>>> [email protected]> wrote: >>>>> >>>>>> Thanks Cham! But I don't think this is Flink specific. I have >>>>>> observed similar behaviour with DirectRunner as well BTW. >>>>>> >>>>>> ..Sumeet >>>>>> >>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I'm not too familiar with Flink but it seems like, for streaming >>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>>>>>> steps for some reason. >>>>>>> * X-lang Bounded read with Flink seems to be fine. >>>>>>> * X-lang Kafka sink and with Flink to be fine. >>>>>>> >>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for >>>>>>> tracking. >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Cham, >>>>>>>> >>>>>>>> Do you have pointers on what might be going on? Or something else I >>>>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm >>>>>>>> not >>>>>>>> the only one seeing this issue at the moment. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Sumeet >>>>>>>> >>>>>>>> [1] >>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Took me some time to setup the Java test (using Java after more >>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink >>>>>>>>> seems to >>>>>>>>> work fine. >>>>>>>>> >>>>>>>>> Here's the relevant Java code. The only difference from the Python >>>>>>>>> version is that I had to extract the KV from the KafkaRecord object >>>>>>>>> and >>>>>>>>> construct a PCollection<KV> explicitly before writing to the output >>>>>>>>> topic. >>>>>>>>> >>>>>>>>> ~~~~~~~~ >>>>>>>>> package org.apache.beam.kafka.test; >>>>>>>>> >>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>>>>> import org.apache.beam.sdk.options.Default; >>>>>>>>> import org.apache.beam.sdk.options.Description; >>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>> import org.apache.beam.sdk.transforms.*; >>>>>>>>> import org.apache.beam.sdk.values.KV; >>>>>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>>>>> >>>>>>>>> public class KafkaTest { >>>>>>>>> >>>>>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>>>>> Default bootstrap kafka servers >>>>>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>>>>> kafka topic name >>>>>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default >>>>>>>>> output kafka topic name >>>>>>>>> >>>>>>>>> /** Specific pipeline options. */ >>>>>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>>>>> @Description("Kafka bootstrap servers") >>>>>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>>>>> String getBootstrap(); >>>>>>>>> >>>>>>>>> void setBootstrap(String value); >>>>>>>>> >>>>>>>>> @Description("Kafka input topic name") >>>>>>>>> @Default.String(INPUT_TOPIC) >>>>>>>>> String getInputTopic(); >>>>>>>>> >>>>>>>>> void setInputTopic(String value); >>>>>>>>> >>>>>>>>> @Description("Kafka output topic name") >>>>>>>>> @Default.String(OUTPUT_TOPIC) >>>>>>>>> String getOutputTopic(); >>>>>>>>> >>>>>>>>> void setOutputTopic(String value); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public static final void main(String[] args) throws Exception { >>>>>>>>> final KafkaTestOptions options = >>>>>>>>> >>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>>>>> >>>>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>>>> pipeline >>>>>>>>> .apply( >>>>>>>>> "ReadFromKafka", >>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>> .withTopic(options.getInputTopic()) >>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>>> .apply( >>>>>>>>> "PrepareForWriting", >>>>>>>>> ParDo.of( >>>>>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>>>>> String>>() { >>>>>>>>> @ProcessElement >>>>>>>>> public void processElement(ProcessContext c) >>>>>>>>> throws Exception { >>>>>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>>>>> c.element().getKV().getValue())); >>>>>>>>> } >>>>>>>>> })) >>>>>>>>> .apply( >>>>>>>>> "WriteToKafka", >>>>>>>>> KafkaIO.<String, String>write() >>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>> .withTopic(options.getOutputTopic()) >>>>>>>>> >>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>>>>> >>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>>>>> >>>>>>>>> pipeline.run(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> ~~~~~~~~~ >>>>>>>>> >>>>>>>>> I'm firing the Java version as follows: >>>>>>>>> >>>>>>>>> $ mvn exec:java >>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>>>>> -Pflink-runner >>>>>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>>>>> >>>>>>>>> And I can see in real time, that as I publish records to the >>>>>>>>> in_topic, the out_topic is able to receive them on a continuous basis. >>>>>>>>> >>>>>>>>> I hope this helps narrow down the issue. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Sumeet >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Are you able to run a similar Java streaming pipeline using >>>>>>>>>> KafkaIO and Flink ? (without x-lang) >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Cham >>>>>>>>>> >>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Cham! >>>>>>>>>>> >>>>>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>>>>> follows: >>>>>>>>>>> >>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>> _ = ( >>>>>>>>>>> pipeline >>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>> consumer_config={ >>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>>>>> topics=[in_topic], >>>>>>>>>>> max_num_records=3) >>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>> producer_config={ >>>>>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>>>>> topic=out_topic)) >>>>>>>>>>> >>>>>>>>>>> I was able to see all 3 records being read, and written >>>>>>>>>>> successfully to the out_topic as well. So, it appears that there >>>>>>>>>>> might be >>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there >>>>>>>>>>> any >>>>>>>>>>> setting that I might be missing? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Sumeet >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Cham! >>>>>>>>>>>> >>>>>>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>>>>>> below), but I still don't see any data being consumed or written >>>>>>>>>>>> back to >>>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>>>>>> details/context that might help narrow down the issue. Apologies >>>>>>>>>>>> for being >>>>>>>>>>>> a bit verbose from hereon! >>>>>>>>>>>> >>>>>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>>>>> >>>>>>>>>>>> ~~~~~~ >>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>>>>> >>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>>>>> >>>>>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>>>>> >>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>> _ = ( >>>>>>>>>>>> pipeline >>>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>>> consumer_config={ >>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>>>>> }, >>>>>>>>>>>> topics=[in_topic]) >>>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>>> producer_config={ >>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>>>>> }, >>>>>>>>>>>> topic=out_topic)) >>>>>>>>>>>> >>>>>>>>>>>> if __name__ == '__main__': >>>>>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>>>>> import argparse >>>>>>>>>>>> >>>>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>> '--bootstrap_servers', >>>>>>>>>>>> dest='bootstrap_servers', >>>>>>>>>>>> required=True, >>>>>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>> '--in_topic', >>>>>>>>>>>> dest='in_topic', >>>>>>>>>>>> required=True, >>>>>>>>>>>> help='Kafka topic to read data from') >>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>> '--out_topic', >>>>>>>>>>>> dest='out_topic', >>>>>>>>>>>> required=True, >>>>>>>>>>>> help='Kafka topic to write data to') >>>>>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>>>>> >>>>>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>>>>> ~~~~~ >>>>>>>>>>>> >>>>>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>>>>> >>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>>>>> >>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>>>>> >>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>>>>> v1 >>>>>>>>>>>> v2 >>>>>>>>>>>> v3 >>>>>>>>>>>> >>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 >>>>>>>>>>>> without >>>>>>>>>>>> processing the records. I don't see any data written to out_topic. >>>>>>>>>>>> I >>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>>>>> >>>>>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>>>>> out_topic=out_topic >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>>>>> in_topic-0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: >>>>>>>>>>>> reading from in_topic-0 starting at offset 0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>>>>> partition(s): in_topic-0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to >>>>>>>>>>>> LATEST offset >>>>>>>>>>>> of partition in_topic-0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset >>>>>>>>>>>> for >>>>>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>>>>> >>>>>>>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>>>>>>> >>>>>>>>>>>> Consumer Config: >>>>>>>>>>>> >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>>>>>>> values:' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>>>>> = true' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset >>>>>>>>>>>> = earliest' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>> = default' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit >>>>>>>>>>>> = false' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>>>>> = true' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>>>>>> 52428800' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms >>>>>>>>>>>> = 500' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = >>>>>>>>>>>> 1' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> theartbeat.interval.ms = 3000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>> = []' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>>>>> = true' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>>>>>> read_uncommitted' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>>>>> = 1048576' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tmax.poll.interval.ms = 300000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = >>>>>>>>>>>> 500' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>>>> []' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>> = 2' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>> = INFO' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>> = 65536' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>>>> = 30000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>> = 100' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>> = 60000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>> = 0.05' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>> = 0.8' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>> = 300' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>> = 60' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>> = 0.8' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>> = 0.05' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>> GSSAPI' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>> = 131072' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms >>>>>>>>>>>> = 10000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>> = https' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>> = SunX509' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>> = JKS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>> = PKIX' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>> = JKS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer >>>>>>>>>>>> = class >>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>> >>>>>>>>>>>> Producer Config: >>>>>>>>>>>> >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>>>>>>> values:' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>>>>> 33554432' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>> = default' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = >>>>>>>>>>>> none' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tdelivery.timeout.ms = 120000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence >>>>>>>>>>>> = false' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>> = []' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = >>>>>>>>>>>> 60000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>>>>> = 5' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>>>>>>> 1048576' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>>>> []' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>> = 2' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>> = INFO' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class >>>>>>>>>>>> = class >>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>> = 32768' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>>>> = 30000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>> = 100' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>> = 60000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>> = 0.05' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>> = 0.8' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>> = 300' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>> = 60' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>> = 0.8' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>> = 0.05' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>> GSSAPI' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>> = 131072' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>> = https' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>> = SunX509' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>> = JKS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>> null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>> = PKIX' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>> = JKS' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id >>>>>>>>>>>> = null' >>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Sumeet >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? >>>>>>>>>>>>> (given that >>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might >>>>>>>>>>>>> not see >>>>>>>>>>>>> prints in the original console I think). >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Cham >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Sumeet, >>>>>>>>>>>>>> >>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST >>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, >>>>>>>>>>>>>> which is 29. >>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the >>>>>>>>>>>>>> pipeline is >>>>>>>>>>>>>> only for testing purpose, I would recommend reading from >>>>>>>>>>>>>> earliest offset to >>>>>>>>>>>>>> see whether you get records. You can do so by constructing your >>>>>>>>>>>>>> ReadFromKafka like: >>>>>>>>>>>>>> ReadFromKafka( >>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>>>> _ = ( >>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any >>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker >>>>>>>>>>>>>>> container, and I'm >>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) to >>>>>>>>>>>>>>> publish and >>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>>>>>> front. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the >>>>>>>>>>>>>>> Beam logs as I >>>>>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>>>> of partition test-0" >>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting >>>>>>>>>>>>>>> offset for >>>>>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>>>>> 1:foo >>>>>>>>>>>>>>> 1:bar >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: >>>>>>>>>>>>>>> %s\n' >>>>>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But despite this, I don't see the actual message being >>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing >>>>>>>>>>>>>>> here are >>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on >>>>>>>>>>>>>>> the Beam >>>>>>>>>>>>>>> pipeline side, but could be incorrect. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Sumeet >>>>>>>>>>>>>>> >>>>>>>>>>>>>>
