Hi Sumeet, After double checking the current support status. the root cause is that when you are using cross-language pipelines, you are actually having pipelines running in the portable way[1]. Currently we haven't supported processing unbounded source on Flink over portable execution well. I have filed https://issues.apache.org/jira/browse/BEAM-11998 to track the progress.
[1] https://s.apache.org/beam-fn-api On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> wrote: > And one more question, did you launch your pipeline with streaming=True > pipeline > options? I think you need to use --streaming=True to have unbounded > source working properly. > > On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote: > >> Hi Sumeet, >> >> Which Beam version are you using for your pipeline? >> >> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> I don't believe Fn API DirectRunner supports streaming yet (I might be >>> wrong). I can confirm that this works for Dataflow. >>> >>> Thanks, >>> Cham >>> >>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >>> [email protected]> wrote: >>> >>>> Thanks Cham! But I don't think this is Flink specific. I have observed >>>> similar behaviour with DirectRunner as well BTW. >>>> >>>> ..Sumeet >>>> >>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> I'm not too familiar with Flink but it seems like, for streaming >>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>>>> steps for some reason. >>>>> * X-lang Bounded read with Flink seems to be fine. >>>>> * X-lang Kafka sink and with Flink to be fine. >>>>> >>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>> >>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Cham, >>>>>> >>>>>> Do you have pointers on what might be going on? Or something else I >>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm >>>>>> not >>>>>> the only one seeing this issue at the moment. >>>>>> >>>>>> Thanks, >>>>>> Sumeet >>>>>> >>>>>> [1] >>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>>> >>>>>> >>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Took me some time to setup the Java test (using Java after more than >>>>>>> a decade!), but yes a similar pipeline with KafkaIO and Flink seems to >>>>>>> work >>>>>>> fine. >>>>>>> >>>>>>> Here's the relevant Java code. The only difference from the Python >>>>>>> version is that I had to extract the KV from the KafkaRecord object and >>>>>>> construct a PCollection<KV> explicitly before writing to the output >>>>>>> topic. >>>>>>> >>>>>>> ~~~~~~~~ >>>>>>> package org.apache.beam.kafka.test; >>>>>>> >>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>>> import org.apache.beam.sdk.options.Default; >>>>>>> import org.apache.beam.sdk.options.Description; >>>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>> import org.apache.beam.sdk.transforms.*; >>>>>>> import org.apache.beam.sdk.values.KV; >>>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>>> >>>>>>> public class KafkaTest { >>>>>>> >>>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>>> Default bootstrap kafka servers >>>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>>> kafka topic name >>>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>>>>>> kafka topic name >>>>>>> >>>>>>> /** Specific pipeline options. */ >>>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>>> @Description("Kafka bootstrap servers") >>>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>>> String getBootstrap(); >>>>>>> >>>>>>> void setBootstrap(String value); >>>>>>> >>>>>>> @Description("Kafka input topic name") >>>>>>> @Default.String(INPUT_TOPIC) >>>>>>> String getInputTopic(); >>>>>>> >>>>>>> void setInputTopic(String value); >>>>>>> >>>>>>> @Description("Kafka output topic name") >>>>>>> @Default.String(OUTPUT_TOPIC) >>>>>>> String getOutputTopic(); >>>>>>> >>>>>>> void setOutputTopic(String value); >>>>>>> } >>>>>>> >>>>>>> public static final void main(String[] args) throws Exception { >>>>>>> final KafkaTestOptions options = >>>>>>> >>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>>> >>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>> pipeline >>>>>>> .apply( >>>>>>> "ReadFromKafka", >>>>>>> KafkaIO.<String, String>read() >>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>> .withTopic(options.getInputTopic()) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> .apply( >>>>>>> "PrepareForWriting", >>>>>>> ParDo.of( >>>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>>> String>>() { >>>>>>> @ProcessElement >>>>>>> public void processElement(ProcessContext c) >>>>>>> throws Exception { >>>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>>> c.element().getKV().getValue())); >>>>>>> } >>>>>>> })) >>>>>>> .apply( >>>>>>> "WriteToKafka", >>>>>>> KafkaIO.<String, String>write() >>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>> .withTopic(options.getOutputTopic()) >>>>>>> >>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>>> >>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>>> >>>>>>> pipeline.run(); >>>>>>> } >>>>>>> } >>>>>>> ~~~~~~~~~ >>>>>>> >>>>>>> I'm firing the Java version as follows: >>>>>>> >>>>>>> $ mvn exec:java >>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>>> -Pflink-runner >>>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>>> >>>>>>> And I can see in real time, that as I publish records to the >>>>>>> in_topic, the out_topic is able to receive them on a continuous basis. >>>>>>> >>>>>>> I hope this helps narrow down the issue. >>>>>>> >>>>>>> Thanks, >>>>>>> Sumeet >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO >>>>>>>> and Flink ? (without x-lang) >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Cham! >>>>>>>>> >>>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>>> follows: >>>>>>>>> >>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>> _ = ( >>>>>>>>> pipeline >>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>> consumer_config={ >>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>>> topics=[in_topic], >>>>>>>>> max_num_records=3) >>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>> producer_config={ >>>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>>> topic=out_topic)) >>>>>>>>> >>>>>>>>> I was able to see all 3 records being read, and written >>>>>>>>> successfully to the out_topic as well. So, it appears that there >>>>>>>>> might be >>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>>>>>> setting that I might be missing? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Sumeet >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hey Cham! >>>>>>>>>> >>>>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>>>> below), but I still don't see any data being consumed or written >>>>>>>>>> back to >>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>>>> details/context that might help narrow down the issue. Apologies for >>>>>>>>>> being >>>>>>>>>> a bit verbose from hereon! >>>>>>>>>> >>>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>>> >>>>>>>>>> ~~~~~~ >>>>>>>>>> import apache_beam as beam >>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>>> >>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>>> >>>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>>> >>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>> _ = ( >>>>>>>>>> pipeline >>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>> consumer_config={ >>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>>> }, >>>>>>>>>> topics=[in_topic]) >>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>> producer_config={ >>>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>>> }, >>>>>>>>>> topic=out_topic)) >>>>>>>>>> >>>>>>>>>> if __name__ == '__main__': >>>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>>> import argparse >>>>>>>>>> >>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>> parser.add_argument( >>>>>>>>>> '--bootstrap_servers', >>>>>>>>>> dest='bootstrap_servers', >>>>>>>>>> required=True, >>>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>>> parser.add_argument( >>>>>>>>>> '--in_topic', >>>>>>>>>> dest='in_topic', >>>>>>>>>> required=True, >>>>>>>>>> help='Kafka topic to read data from') >>>>>>>>>> parser.add_argument( >>>>>>>>>> '--out_topic', >>>>>>>>>> dest='out_topic', >>>>>>>>>> required=True, >>>>>>>>>> help='Kafka topic to write data to') >>>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>>> >>>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>>> ~~~~~ >>>>>>>>>> >>>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>>> >>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>>> >>>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>>> >>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>>> v1 >>>>>>>>>> v2 >>>>>>>>>> v3 >>>>>>>>>> >>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>>>>>> processing the records. I don't see any data written to out_topic. I >>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>>> >>>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>>> out_topic=out_topic >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>>> in_topic-0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: >>>>>>>>>> reading from in_topic-0 starting at offset 0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>>> partition(s): in_topic-0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>>>>>> offset >>>>>>>>>> of partition in_topic-0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset >>>>>>>>>> for >>>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>>> >>>>>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>>>>> >>>>>>>>>> Consumer Config: >>>>>>>>>> >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>>>>> values:' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>>> = true' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>>>>>> earliest' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>>> [localhost:29092]' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>>> default' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>>>>>>> false' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>>> = true' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>>>> 52428800' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = >>>>>>>>>> 500' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> theartbeat.interval.ms = 3000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>> = []' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>>> = true' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>>>> read_uncommitted' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>>> = 1048576' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms >>>>>>>>>> = 300000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = >>>>>>>>>> 500' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>>> = 300000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>> []' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>> = 2' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>> = INFO' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>> = 65536' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>>>> = 50' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>> = 30000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>>> 100' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>> = 60000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>> = 0.05' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>> = 0.8' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>> = 300' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>> = 60' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>> = 0.8' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>> = 0.05' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>> GSSAPI' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>>> PLAINTEXT' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>>> 131072' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms >>>>>>>>>> = 10000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>> = https' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>> = SunX509' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>>> JKS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>> = PKIX' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>> = JKS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>> >>>>>>>>>> Producer Config: >>>>>>>>>> >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>>>>> values:' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>>> [localhost:29092]' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>>> 33554432' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>>> default' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = >>>>>>>>>> none' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms >>>>>>>>>> = 120000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>>>>>>> false' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>> = []' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = >>>>>>>>>> 60000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>>> = 5' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>>>>> 1048576' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>>> = 300000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = >>>>>>>>>> []' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>> = 2' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>> = INFO' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>> = 32768' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>>>> = 50' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms >>>>>>>>>> = 30000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>>> 100' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>> = 60000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>> = 0.05' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>> = 0.8' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>> = 300' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>> = 60' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>> = 0.8' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>> = 0.05' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>> GSSAPI' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>>> PLAINTEXT' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>>> 131072' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>> = https' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>> = SunX509' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>>> JKS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>> = PKIX' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>> = null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>> = JKS' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = >>>>>>>>>> null' >>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Sumeet >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>>> distributed system like GCS) instead of just printing them ? (given >>>>>>>>>>> that >>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might >>>>>>>>>>> not see >>>>>>>>>>> prints in the original console I think). >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Cham >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Sumeet, >>>>>>>>>>>> >>>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which >>>>>>>>>>>> is default setting) as the start offset to read, which is 29. Do >>>>>>>>>>>> you have >>>>>>>>>>>> more than 29 records to read at that point? If the pipeline is >>>>>>>>>>>> only for >>>>>>>>>>>> testing purpose, I would recommend reading from earliest offset to >>>>>>>>>>>> see >>>>>>>>>>>> whether you get records. You can do so by constructing your >>>>>>>>>>>> ReadFromKafka >>>>>>>>>>>> like: >>>>>>>>>>>> ReadFromKafka( >>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>>> topics=['test']) >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi All, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>>> >>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>> _ = ( >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>>> >>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any >>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker >>>>>>>>>>>>> container, and I'm >>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) to >>>>>>>>>>>>> publish and >>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>>>> front. >>>>>>>>>>>>> >>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam >>>>>>>>>>>>> logs as I >>>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>>> >>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>> timestamp { >>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>>> } >>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>> of partition test-0" >>>>>>>>>>>>> log_location: >>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>> >>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>> timestamp { >>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>>> } >>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting >>>>>>>>>>>>> offset for >>>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>>> log_location: >>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>> >>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>>> >>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>>> 1:foo >>>>>>>>>>>>> 1:bar >>>>>>>>>>>>> >>>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>>> >>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: >>>>>>>>>>>>> %s\n' >>>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>>> >>>>>>>>>>>>> But despite this, I don't see the actual message being printed >>>>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are >>>>>>>>>>>>> appreciated. >>>>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam >>>>>>>>>>>>> pipeline side, >>>>>>>>>>>>> but could be incorrect. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Sumeet >>>>>>>>>>>>> >>>>>>>>>>>>
