Thanks Cham! But I don't think this is Flink specific. I have observed similar behaviour with DirectRunner as well BTW.
..Sumeet On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <[email protected]> wrote: > I'm not too familiar with Flink but it seems like, for streaming > pipelines, messages from Kafka/SDF read do not get pushed to subsequent > steps for some reason. > * X-lang Bounded read with Flink seems to be fine. > * X-lang Kafka sink and with Flink to be fine. > > Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking. > > Thanks, > Cham > > > > On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <[email protected]> > wrote: > >> Hi Cham, >> >> Do you have pointers on what might be going on? Or something else I can >> try? I had posted the same on StackOverflow [1], it seems that I'm not the >> only one seeing this issue at the moment. >> >> Thanks, >> Sumeet >> >> [1] >> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >> >> >> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >> [email protected]> wrote: >> >>> Took me some time to setup the Java test (using Java after more than a >>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work >>> fine. >>> >>> Here's the relevant Java code. The only difference from the Python >>> version is that I had to extract the KV from the KafkaRecord object and >>> construct a PCollection<KV> explicitly before writing to the output topic. >>> >>> ~~~~~~~~ >>> package org.apache.beam.kafka.test; >>> >>> import org.apache.beam.sdk.Pipeline; >>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>> import org.apache.beam.sdk.options.Default; >>> import org.apache.beam.sdk.options.Description; >>> import org.apache.beam.sdk.options.PipelineOptions; >>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>> import org.apache.beam.sdk.transforms.*; >>> import org.apache.beam.sdk.values.KV; >>> import org.apache.beam.sdk.values.PCollection; >>> import org.apache.kafka.common.serialization.StringDeserializer; >>> >>> public class KafkaTest { >>> >>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default >>> bootstrap kafka servers >>> static final String INPUT_TOPIC = "in_topic"; // Default input kafka >>> topic name >>> static final String OUTPUT_TOPIC = "out_topic"; // Default output >>> kafka topic name >>> >>> /** Specific pipeline options. */ >>> public interface KafkaTestOptions extends PipelineOptions { >>> @Description("Kafka bootstrap servers") >>> @Default.String(BOOTSTRAP_SERVERS) >>> String getBootstrap(); >>> >>> void setBootstrap(String value); >>> >>> @Description("Kafka input topic name") >>> @Default.String(INPUT_TOPIC) >>> String getInputTopic(); >>> >>> void setInputTopic(String value); >>> >>> @Description("Kafka output topic name") >>> @Default.String(OUTPUT_TOPIC) >>> String getOutputTopic(); >>> >>> void setOutputTopic(String value); >>> } >>> >>> public static final void main(String[] args) throws Exception { >>> final KafkaTestOptions options = >>> >>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>> >>> Pipeline pipeline = Pipeline.create(options); >>> pipeline >>> .apply( >>> "ReadFromKafka", >>> KafkaIO.<String, String>read() >>> .withBootstrapServers(options.getBootstrap()) >>> .withTopic(options.getInputTopic()) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class)) >>> .apply( >>> "PrepareForWriting", >>> ParDo.of( >>> new DoFn<KafkaRecord<String, String>, KV<String, >>> String>>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) throws >>> Exception { >>> c.output(KV.of(c.element().getKV().getKey(), >>> c.element().getKV().getValue())); >>> } >>> })) >>> .apply( >>> "WriteToKafka", >>> KafkaIO.<String, String>write() >>> .withBootstrapServers(options.getBootstrap()) >>> .withTopic(options.getOutputTopic()) >>> >>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>> >>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>> >>> pipeline.run(); >>> } >>> } >>> ~~~~~~~~~ >>> >>> I'm firing the Java version as follows: >>> >>> $ mvn exec:java >>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner >>> -Dexec.args="--runner=FlinkRunner" >>> >>> And I can see in real time, that as I publish records to the in_topic, >>> the out_topic is able to receive them on a continuous basis. >>> >>> I hope this helps narrow down the issue. >>> >>> Thanks, >>> Sumeet >>> >>> >>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>> [email protected]> wrote: >>> >>>> Are you able to run a similar Java streaming pipeline using KafkaIO and >>>> Flink ? (without x-lang) >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>> [email protected]> wrote: >>>> >>>>> Hi Cham! >>>>> >>>>> So finally I was able to get partial success. Since I had >>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>> max_num_records=3 to see if it can read all existing records, as follows: >>>>> >>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>> _ = ( >>>>> pipeline >>>>> | 'Read from kafka' >> ReadFromKafka( >>>>> consumer_config={ >>>>> 'bootstrap.servers': bootstrap_servers, >>>>> 'auto.offset.reset': 'earliest'}, >>>>> topics=[in_topic], >>>>> max_num_records=3) >>>>> | 'Write to kafka' >> WriteToKafka( >>>>> producer_config={ >>>>> 'bootstrap.servers': bootstrap_servers}, >>>>> topic=out_topic)) >>>>> >>>>> I was able to see all 3 records being read, and written >>>>> successfully to the out_topic as well. So, it appears that there might be >>>>> some issue with reading unbounded Kafka streams here? Or is there any >>>>> setting that I might be missing? >>>>> >>>>> Thanks, >>>>> Sumeet >>>>> >>>>> >>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>> [email protected]> wrote: >>>>> >>>>>> Hey Cham! >>>>>> >>>>>> Appreciate the response. I tried out your suggestions (details >>>>>> below), but I still don't see any data being consumed or written back to >>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>> details/context that might help narrow down the issue. Apologies for >>>>>> being >>>>>> a bit verbose from hereon! >>>>>> >>>>>> First, here's what my pipeline code looks like now: >>>>>> >>>>>> ~~~~~~ >>>>>> import apache_beam as beam >>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>> >>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>> save_main_session=True, streaming=True) >>>>>> >>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>> in_topic=%s out_topic=%s', >>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>> >>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>> _ = ( >>>>>> pipeline >>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>> consumer_config={ >>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>> 'auto.offset.reset': 'earliest' >>>>>> }, >>>>>> topics=[in_topic]) >>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>> producer_config={ >>>>>> 'bootstrap.servers': bootstrap_servers >>>>>> }, >>>>>> topic=out_topic)) >>>>>> >>>>>> if __name__ == '__main__': >>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>> import argparse >>>>>> >>>>>> parser = argparse.ArgumentParser() >>>>>> parser.add_argument( >>>>>> '--bootstrap_servers', >>>>>> dest='bootstrap_servers', >>>>>> required=True, >>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>> parser.add_argument( >>>>>> '--in_topic', >>>>>> dest='in_topic', >>>>>> required=True, >>>>>> help='Kafka topic to read data from') >>>>>> parser.add_argument( >>>>>> '--out_topic', >>>>>> dest='out_topic', >>>>>> required=True, >>>>>> help='Kafka topic to write data to') >>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>> >>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>> known_args.out_topic, pipeline_args) >>>>>> ~~~~~ >>>>>> >>>>>> I'm firing this pipeline as follows: >>>>>> >>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>> >>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>> >>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>> v1 >>>>>> v2 >>>>>> v3 >>>>>> >>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>> records from offset 0, but then seeks to the latest offset 3 without >>>>>> processing the records. I don't see any data written to out_topic. I >>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>> >>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >>>>>> in_topic=in_topic out_topic=out_topic >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned >>>>>> to split 0 (total 1): in_topic-0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>> in_topic-0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>> in_topic-0 to offset 0.' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading >>>>>> from in_topic-0 starting at offset 0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>> partition(s): in_topic-0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST >>>>>> offset >>>>>> of partition in_topic-0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >>>>>> partition in_topic-0 to offset 3.' >>>>>> >>>>>> Additionally, the logs also emit complete consumer and producer >>>>>> configs. I'm dumping them here, in case that helps: >>>>>> >>>>>> Consumer Config: >>>>>> >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig >>>>>> values:' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>> = true' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms >>>>>> = 5000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = >>>>>> earliest' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>> [localhost:29092]' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>> default' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>>> = 540000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms >>>>>> = 60000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = >>>>>> false' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>> = true' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>> 52428800' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms = >>>>>> 3000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>> = true' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>> read_uncommitted' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class >>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>> = 1048576' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = >>>>>> 300000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>> 300000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>> = INFO' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>>>> = 30000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>> 65536' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>>>> = 1000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>>> 50' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>> 30000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>> = /usr/bin/kinit' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>> = 60000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>> tsasl.kerberos.service.name = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>> = 0.05' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>> = 0.8' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>> = 300' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>> = 60' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>> = 0.8' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>> = 0.05' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>> PLAINTEXT' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>> 131072' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = >>>>>> 10000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>> = https' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>> = SunX509' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>> = PKIX' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>> JKS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = >>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>> >>>>>> Producer Config: >>>>>> >>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig >>>>>> values:' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >>>>>> [localhost:29092]' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = >>>>>> default' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms >>>>>> = 540000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >>>>>> 120000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = >>>>>> false' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >>>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>> = 5' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = >>>>>> 1048576' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >>>>>> 300000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>> = INFO' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms >>>>>> = 30000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = >>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = >>>>>> 32768' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms >>>>>> = 1000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = >>>>>> 50' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = >>>>>> 30000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>> = /usr/bin/kinit' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>> = 60000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>> tsasl.kerberos.service.name = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>> = 0.05' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>> = 0.8' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>> = 300' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>> = 60' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>> = 0.8' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>> = 0.05' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >>>>>> PLAINTEXT' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = >>>>>> 131072' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >>>>>> [TLSv1.2, TLSv1.1, TLSv1]' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>> = https' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>> = SunX509' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = >>>>>> null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>> = PKIX' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>> = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = >>>>>> JKS' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms >>>>>> = 60000' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null' >>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class >>>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>> >>>>>> >>>>>> Apologies again for dumping almost everything here :-) Any pointers >>>>>> on what might be the issue are appreciated. >>>>>> >>>>>> Thanks, >>>>>> Sumeet >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>> distributed system like GCS) instead of just printing them ? (given that >>>>>>> multi-language pipelines run SDK containers in Docker you might not see >>>>>>> prints in the original console I think). >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Sumeet, >>>>>>>> >>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is >>>>>>>> default setting) as the start offset to read, which is 29. Do you have >>>>>>>> more >>>>>>>> than 29 records to read at that point? If the pipeline is only for >>>>>>>> testing >>>>>>>> purpose, I would recommend reading from earliest offset to see whether >>>>>>>> you >>>>>>>> get records. You can do so by constructing your ReadFromKafka like: >>>>>>>> ReadFromKafka( >>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>> topics=['test']) >>>>>>>> >>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I'm trying out a simple example of reading data off a Kafka topic >>>>>>>>> into Apache Beam. Here's the relevant snippet: >>>>>>>>> >>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>> _ = ( >>>>>>>>> pipeline >>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>> 'localhost:29092'}, >>>>>>>>> topics=['test']) >>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>> >>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages >>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm >>>>>>>>> able to >>>>>>>>> use `kafkacat` from the host (outside the container) to publish and >>>>>>>>> subscribe to messages. So, I guess there are no issues on that front. >>>>>>>>> >>>>>>>>> It appears that Beam is able to connect to Kafka and get notified >>>>>>>>> of new messages, as I see the offset changes in the Beam logs as I >>>>>>>>> publish >>>>>>>>> data from `kafkacat`: >>>>>>>>> >>>>>>>>> INFO:root:severity: INFO >>>>>>>>> timestamp { >>>>>>>>> seconds: 1612886861 >>>>>>>>> nanos: 534000000 >>>>>>>>> } >>>>>>>>> message: "[Consumer >>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST >>>>>>>>> offset >>>>>>>>> of partition test-0" >>>>>>>>> log_location: >>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>> thread: "22" >>>>>>>>> >>>>>>>>> INFO:root:severity: INFO >>>>>>>>> timestamp { >>>>>>>>> seconds: 1612886861 >>>>>>>>> nanos: 537000000 >>>>>>>>> } >>>>>>>>> message: "[Consumer >>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for >>>>>>>>> partition test-0 to offset 29." >>>>>>>>> log_location: >>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>> thread: "22" >>>>>>>>> >>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>> >>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>> 1:foo >>>>>>>>> 1:bar >>>>>>>>> >>>>>>>>> and I can confirm that its being received, again using `kafkacat`: >>>>>>>>> >>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>>>>>> Key: 1 Value: foo >>>>>>>>> Key: 1 Value: bar >>>>>>>>> >>>>>>>>> But despite this, I don't see the actual message being printed by >>>>>>>>> Beam as I expected. Any pointers to what's missing here are >>>>>>>>> appreciated. >>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline >>>>>>>>> side, >>>>>>>>> but could be incorrect. >>>>>>>>> >>>>>>>>> Thanks in advance for any pointers! >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Sumeet >>>>>>>>> >>>>>>>>
