And one more question, did you launch your pipeline with streaming=True pipeline options? I think you need to use --streaming=True to have unbounded source working properly.
On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote: > Hi Sumeet, > > Which Beam version are you using for your pipeline? > > On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]> > wrote: > >> I don't believe Fn API DirectRunner supports streaming yet (I might be >> wrong). I can confirm that this works for Dataflow. >> >> Thanks, >> Cham >> >> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >> [email protected]> wrote: >> >>> Thanks Cham! But I don't think this is Flink specific. I have observed >>> similar behaviour with DirectRunner as well BTW. >>> >>> ..Sumeet >>> >>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>> [email protected]> wrote: >>> >>>> I'm not too familiar with Flink but it seems like, for streaming >>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>>> steps for some reason. >>>> * X-lang Bounded read with Flink seems to be fine. >>>> * X-lang Kafka sink and with Flink to be fine. >>>> >>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. >>>> >>>> Thanks, >>>> Cham >>>> >>>> >>>> >>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>> [email protected]> wrote: >>>> >>>>> Hi Cham, >>>>> >>>>> Do you have pointers on what might be going on? Or something else I >>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm not >>>>> the only one seeing this issue at the moment. >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>>> [1] >>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>> >>>>> >>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>> [email protected]> wrote: >>>>> >>>>>> Took me some time to setup the Java test (using Java after more than >>>>>> a decade!), but yes a similar pipeline with KafkaIO and Flink seems to >>>>>> work >>>>>> fine. >>>>>> >>>>>> Here's the relevant Java code. The only difference from the Python >>>>>> version is that I had to extract the KV from the KafkaRecord object and >>>>>> construct a PCollection<KV> explicitly before writing to the output >>>>>> topic. >>>>>> >>>>>> ~~~~~~~~ >>>>>> package org.apache.beam.kafka.test; >>>>>> >>>>>> import org.apache.beam.sdk.Pipeline; >>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>> import org.apache.beam.sdk.options.Default; >>>>>> import org.apache.beam.sdk.options.Description; >>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>> import org.apache.beam.sdk.transforms.*; >>>>>> import org.apache.beam.sdk.values.KV; >>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>> >>>>>> public class KafkaTest { >>>>>> >>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>> Default bootstrap kafka servers >>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>> kafka topic name >>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>>>>> kafka topic name >>>>>> >>>>>> /** Specific pipeline options. */ >>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>> @Description("Kafka bootstrap servers") >>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>> String getBootstrap(); >>>>>> >>>>>> void setBootstrap(String value); >>>>>> >>>>>> @Description("Kafka input topic name") >>>>>> @Default.String(INPUT_TOPIC) >>>>>> String getInputTopic(); >>>>>> >>>>>> void setInputTopic(String value); >>>>>> >>>>>> @Description("Kafka output topic name") >>>>>> @Default.String(OUTPUT_TOPIC) >>>>>> String getOutputTopic(); >>>>>> >>>>>> void setOutputTopic(String value); >>>>>> } >>>>>> >>>>>> public static final void main(String[] args) throws Exception { >>>>>> final KafkaTestOptions options = >>>>>> >>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>> >>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>> pipeline >>>>>> .apply( >>>>>> "ReadFromKafka", >>>>>> KafkaIO.<String, String>read() >>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>> .withTopic(options.getInputTopic()) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>> .apply( >>>>>> "PrepareForWriting", >>>>>> ParDo.of( >>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>> String>>() { >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext c) throws >>>>>> Exception { >>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>> c.element().getKV().getValue())); >>>>>> } >>>>>> })) >>>>>> .apply( >>>>>> "WriteToKafka", >>>>>> KafkaIO.<String, String>write() >>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>> .withTopic(options.getOutputTopic()) >>>>>> >>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>> >>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>> >>>>>> pipeline.run(); >>>>>> } >>>>>> } >>>>>> ~~~~~~~~~ >>>>>> >>>>>> I'm firing the Java version as follows: >>>>>> >>>>>> $ mvn exec:java >>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>> -Pflink-runner >>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>> >>>>>> And I can see in real time, that as I publish records to the >>>>>> in_topic, the out_topic is able to receive them on a continuous basis. >>>>>> >>>>>> I hope this helps narrow down the issue. >>>>>> >>>>>> Thanks, >>>>>> Sumeet >>>>>> >>>>>> >>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO >>>>>>> and Flink ? (without x-lang) >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Cham! >>>>>>>> >>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>> follows: >>>>>>>> >>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>> _ = ( >>>>>>>> pipeline >>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>> consumer_config={ >>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>> topics=[in_topic], >>>>>>>> max_num_records=3) >>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>> producer_config={ >>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>> topic=out_topic)) >>>>>>>> >>>>>>>> I was able to see all 3 records being read, and written >>>>>>>> successfully to the out_topic as well. So, it appears that there might >>>>>>>> be >>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>>>>> setting that I might be missing? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Sumeet >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hey Cham! >>>>>>>>> >>>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>>> below), but I still don't see any data being consumed or written back >>>>>>>>> to >>>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>>> details/context that might help narrow down the issue. Apologies for >>>>>>>>> being >>>>>>>>> a bit verbose from hereon! >>>>>>>>> >>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>> >>>>>>>>> ~~~~~~ >>>>>>>>> import apache_beam as beam >>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>>> >>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>> >>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>> >>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>> _ = ( >>>>>>>>> pipeline >>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>> consumer_config={ >>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>> }, >>>>>>>>> topics=[in_topic]) >>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>> producer_config={ >>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>> }, >>>>>>>>> topic=out_topic)) >>>>>>>>> >>>>>>>>> if __name__ == '__main__': >>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>> import argparse >>>>>>>>> >>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>> parser.add_argument( >>>>>>>>> '--bootstrap_servers', >>>>>>>>> dest='bootstrap_servers', >>>>>>>>> required=True, >>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>> parser.add_argument( >>>>>>>>> '--in_topic', >>>>>>>>> dest='in_topic', >>>>>>>>> required=True, >>>>>>>>> help='Kafka topic to read data from') >>>>>>>>> parser.add_argument( >>>>>>>>> '--out_topic', >>>>>>>>> dest='out_topic', >>>>>>>>> required=True, >>>>>>>>> help='Kafka topic to write data to') >>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>> >>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>> ~~~~~ >>>>>>>>> >>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>> >>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>> >>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>> >>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>> v1 >>>>>>>>> v2 >>>>>>>>> v3 >>>>>>>>> >>>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>>>>> processing the records. I don't see any data written to out_topic. I >>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>> >>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>> out_topic=out_topic >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>> in_topic-0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading >>>>>>>>> from in_topic-0 starting at offset 0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>> partition(s): in_topic-0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>>>>> offset >>>>>>>>> of partition in_topic-0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>> >>>>>>>>> Additionally, the logs also emit complete consumer and producer >>>>>>>>> configs. I'm dumping them here, in case that helps: >>>>>>>>> >>>>>>>>> Consumer Config: >>>>>>>>> >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>>>>> values:' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>> = true' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>>>>> earliest' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>> [localhost:29092]' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>> default' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>>>>>> false' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>> = true' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>>> 52428800' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = >>>>>>>>> 500' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms >>>>>>>>> = 3000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>>>> []' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>> = true' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>>> read_uncommitted' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = >>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>> = 1048576' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms >>>>>>>>> = 300000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = >>>>>>>>> 500' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>> = 300000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = >>>>>>>>> 2' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>> = INFO' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>> = 65536' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>>> = 50' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>>>> 30000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>> 100' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>> = /usr/bin/kinit' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>> = 60000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>> = 0.05' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>> = 0.8' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>> = 300' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>> = 60' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>> = 0.8' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>> = 0.05' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>> GSSAPI' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>> PLAINTEXT' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>> 131072' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>>>>>>> 10000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>> = https' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>> = SunX509' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>> JKS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>> = PKIX' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>>>> JKS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>> >>>>>>>>> Producer Config: >>>>>>>>> >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>>>>> values:' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>>>>> [localhost:29092]' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>> 33554432' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>>>>> default' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = >>>>>>>>> none' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms >>>>>>>>> = 120000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>>>>>> false' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = >>>>>>>>> []' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>> = 5' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>>>>> 1048576' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms >>>>>>>>> = 300000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = >>>>>>>>> 2' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>> = INFO' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>> = 32768' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms >>>>>>>>> = 50' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>>>>> 30000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = >>>>>>>>> 100' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>> = /usr/bin/kinit' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>> = 60000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>> = 0.05' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>> = 0.8' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>> = 300' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>> = 60' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>> = 0.8' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>> = 0.05' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>> GSSAPI' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>>>>> PLAINTEXT' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>>>>> 131072' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>> = https' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>> = SunX509' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = >>>>>>>>> JKS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>> = PKIX' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>> = null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>>>>> JKS' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = >>>>>>>>> null' >>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = >>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>> >>>>>>>>> >>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Sumeet >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>> distributed system like GCS) instead of just printing them ? (given >>>>>>>>>> that >>>>>>>>>> multi-language pipelines run SDK containers in Docker you might not >>>>>>>>>> see >>>>>>>>>> prints in the original console I think). >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Cham >>>>>>>>>> >>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Sumeet, >>>>>>>>>>> >>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which >>>>>>>>>>> is default setting) as the start offset to read, which is 29. Do >>>>>>>>>>> you have >>>>>>>>>>> more than 29 records to read at that point? If the pipeline is only >>>>>>>>>>> for >>>>>>>>>>> testing purpose, I would recommend reading from earliest offset to >>>>>>>>>>> see >>>>>>>>>>> whether you get records. You can do so by constructing your >>>>>>>>>>> ReadFromKafka >>>>>>>>>>> like: >>>>>>>>>>> ReadFromKafka( >>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>> topics=['test']) >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi All, >>>>>>>>>>>> >>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>> >>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>> _ = ( >>>>>>>>>>>> pipeline >>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>> topics=['test']) >>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>> >>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm >>>>>>>>>>>> able to >>>>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>>> front. >>>>>>>>>>>> >>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam >>>>>>>>>>>> logs as I >>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>> >>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>> timestamp { >>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>> } >>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>> LATEST offset >>>>>>>>>>>> of partition test-0" >>>>>>>>>>>> log_location: >>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>> thread: "22" >>>>>>>>>>>> >>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>> timestamp { >>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>> } >>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset >>>>>>>>>>>> for >>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>> log_location: >>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>> thread: "22" >>>>>>>>>>>> >>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>> >>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>> 1:foo >>>>>>>>>>>> 1:bar >>>>>>>>>>>> >>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>> >>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: >>>>>>>>>>>> %s\n' >>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>> >>>>>>>>>>>> But despite this, I don't see the actual message being printed >>>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are >>>>>>>>>>>> appreciated. >>>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline >>>>>>>>>>>> side, >>>>>>>>>>>> but could be incorrect. >>>>>>>>>>>> >>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Sumeet >>>>>>>>>>>> >>>>>>>>>>>
