I don't believe Fn API DirectRunner supports streaming yet (I might be
wrong). I can confirm that this works for Dataflow.

Thanks,
Cham

On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <[email protected]>
wrote:

> Thanks Cham! But I don't think this is Flink specific. I have observed
> similar behaviour with DirectRunner as well BTW.
>
> ..Sumeet
>
> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> I'm not too familiar with Flink but it seems like, for streaming
>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>> steps for some reason.
>> * X-lang Bounded read with Flink seems to be fine.
>> * X-lang Kafka sink and with Flink to be fine.
>>
>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking.
>>
>> Thanks,
>> Cham
>>
>>
>>
>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>> [email protected]> wrote:
>>
>>> Hi Cham,
>>>
>>> Do you have pointers on what might be going on? Or something else I can
>>> try? I had posted the same on StackOverflow [1], it seems that I'm not the
>>> only one seeing this issue at the moment.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>> [1]
>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>
>>>
>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>> [email protected]> wrote:
>>>
>>>> Took me some time to setup the Java test (using Java after more than a
>>>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work
>>>> fine.
>>>>
>>>> Here's the relevant Java code. The only difference from the Python
>>>> version is that I had to extract the KV from the KafkaRecord object and
>>>> construct a PCollection<KV> explicitly before writing to the output topic.
>>>>
>>>> ~~~~~~~~
>>>> package org.apache.beam.kafka.test;
>>>>
>>>> import org.apache.beam.sdk.Pipeline;
>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>> import org.apache.beam.sdk.options.Default;
>>>> import org.apache.beam.sdk.options.Description;
>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>> import org.apache.beam.sdk.transforms.*;
>>>> import org.apache.beam.sdk.values.KV;
>>>> import org.apache.beam.sdk.values.PCollection;
>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>
>>>> public class KafkaTest {
>>>>
>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default
>>>> bootstrap kafka servers
>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input kafka
>>>> topic name
>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default output
>>>> kafka topic name
>>>>
>>>>   /** Specific pipeline options. */
>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>     @Description("Kafka bootstrap servers")
>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>     String getBootstrap();
>>>>
>>>>     void setBootstrap(String value);
>>>>
>>>>     @Description("Kafka input topic name")
>>>>     @Default.String(INPUT_TOPIC)
>>>>     String getInputTopic();
>>>>
>>>>     void setInputTopic(String value);
>>>>
>>>>     @Description("Kafka output topic name")
>>>>     @Default.String(OUTPUT_TOPIC)
>>>>     String getOutputTopic();
>>>>
>>>>     void setOutputTopic(String value);
>>>>   }
>>>>
>>>>   public static final void main(String[] args) throws Exception {
>>>>     final KafkaTestOptions options =
>>>>
>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>
>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>     pipeline
>>>>         .apply(
>>>>             "ReadFromKafka",
>>>>             KafkaIO.<String, String>read()
>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>                 .withTopic(options.getInputTopic())
>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>         .apply(
>>>>             "PrepareForWriting",
>>>>             ParDo.of(
>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>> String>>() {
>>>>                   @ProcessElement
>>>>                   public void processElement(ProcessContext c) throws
>>>> Exception {
>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>> c.element().getKV().getValue()));
>>>>                   }
>>>>                 }))
>>>>         .apply(
>>>>             "WriteToKafka",
>>>>             KafkaIO.<String, String>write()
>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>                 .withTopic(options.getOutputTopic())
>>>>
>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>
>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>
>>>>     pipeline.run();
>>>>   }
>>>> }
>>>> ~~~~~~~~~
>>>>
>>>> I'm firing the Java version as follows:
>>>>
>>>> $ mvn exec:java
>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner
>>>> -Dexec.args="--runner=FlinkRunner"
>>>>
>>>> And I can see in real time, that as I publish records to the in_topic,
>>>> the out_topic is able to receive them on a continuous basis.
>>>>
>>>> I hope this helps narrow down the issue.
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>>
>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO
>>>>> and Flink ? (without x-lang)
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Cham!
>>>>>>
>>>>>> So finally I was able to get partial success. Since I had
>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>> max_num_records=3 to see if it can read all existing records, as follows:
>>>>>>
>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>     _ = (
>>>>>>         pipeline
>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>             consumer_config={
>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>             topics=[in_topic],
>>>>>>             max_num_records=3)
>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>             producer_config={
>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>             topic=out_topic))
>>>>>>
>>>>>> I was able to see all 3 records being read, and written
>>>>>> successfully to the out_topic as well. So, it appears that there might be
>>>>>> some issue with reading unbounded Kafka streams here? Or is there any
>>>>>> setting that I might be missing?
>>>>>>
>>>>>> Thanks,
>>>>>> Sumeet
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hey Cham!
>>>>>>>
>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>> below), but I still don't see any data being consumed or written back to
>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>> details/context that might help narrow down the issue. Apologies for 
>>>>>>> being
>>>>>>> a bit verbose from hereon!
>>>>>>>
>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>
>>>>>>> ~~~~~~
>>>>>>> import apache_beam as beam
>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>
>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>> save_main_session=True, streaming=True)
>>>>>>>
>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>
>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>     _ = (
>>>>>>>         pipeline
>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>             },
>>>>>>>             topics=[in_topic])
>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>             producer_config={
>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>             },
>>>>>>>             topic=out_topic))
>>>>>>>
>>>>>>> if __name__ == '__main__':
>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>   import argparse
>>>>>>>
>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>   parser.add_argument(
>>>>>>>       '--bootstrap_servers',
>>>>>>>       dest='bootstrap_servers',
>>>>>>>       required=True,
>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>   parser.add_argument(
>>>>>>>       '--in_topic',
>>>>>>>       dest='in_topic',
>>>>>>>       required=True,
>>>>>>>       help='Kafka topic to read data from')
>>>>>>>   parser.add_argument(
>>>>>>>       '--out_topic',
>>>>>>>       dest='out_topic',
>>>>>>>       required=True,
>>>>>>>       help='Kafka topic to write data to')
>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>
>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>> ~~~~~
>>>>>>>
>>>>>>> I'm firing this pipeline as follows:
>>>>>>>
>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>
>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>
>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>> v1
>>>>>>> v2
>>>>>>> v3
>>>>>>>
>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>> records from offset 0, but then seeks to the latest offset 3 without
>>>>>>> processing the records. I don't see any data written to out_topic. I
>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>
>>>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
>>>>>>> in_topic=in_topic out_topic=out_topic
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned
>>>>>>> to split 0 (total 1): in_topic-0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>> in_topic-0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>> in_topic-0 to offset 0.'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading
>>>>>>> from in_topic-0 starting at offset 0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>> partition(s): in_topic-0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST 
>>>>>>> offset
>>>>>>> of partition in_topic-0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>
>>>>>>> Additionally, the logs also emit complete consumer and producer
>>>>>>> configs. I'm dumping them here, in case that helps:
>>>>>>>
>>>>>>> Consumer Config:
>>>>>>>
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>>>> values:'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>> = true'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms
>>>>>>> = 5000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>>>>>> earliest'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>> [localhost:29092]'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>> default'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms
>>>>>>> = 540000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms
>>>>>>> = 60000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit =
>>>>>>> false'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>> = true'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>> 52428800'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms =
>>>>>>> 500'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms
>>>>>>> = 3000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>> []'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>> = true'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>> read_uncommitted'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer =
>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>> = 1048576'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms =
>>>>>>> 300000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>>>> 300000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>> = INFO'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>>>> 65536'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms =
>>>>>>> 50'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>> 30000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>> = /usr/bin/kinit'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>> = 60000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>> = 0.05'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>> = 0.8'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>> = 300'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>> = 60'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>> = 0.8'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>> = 0.05'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>> PLAINTEXT'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>> 131072'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms =
>>>>>>> 10000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>>>>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>> = https'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>> = SunX509'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>> = PKIX'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>> JKS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer =
>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>
>>>>>>> Producer Config:
>>>>>>>
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>>>> values:'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>> [localhost:29092]'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>> 33554432'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>> default'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms
>>>>>>> = 540000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms =
>>>>>>> 120000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence =
>>>>>>> false'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>> []'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
>>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>> = 5'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>>>> 1048576'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>>>> 300000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>> = INFO'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class =
>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>>>> 32768'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms =
>>>>>>> 50'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>> 30000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>> = /usr/bin/kinit'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>> = 60000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>> = 0.05'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>> = 0.8'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>> = 300'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>> = 60'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>> = 0.8'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>> = 0.05'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>> PLAINTEXT'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>> 131072'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>>>>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>> = https'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>> = SunX509'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>> = PKIX'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>> = null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>> JKS'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms
>>>>>>> = 60000'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id =
>>>>>>> null'
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer =
>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>
>>>>>>>
>>>>>>> Apologies again for dumping almost everything here :-) Any pointers
>>>>>>> on what might be the issue are appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sumeet
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>> distributed system like GCS) instead of just printing them ? (given 
>>>>>>>> that
>>>>>>>> multi-language pipelines run SDK containers in Docker you might  not 
>>>>>>>> see
>>>>>>>> prints in the  original console I think).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Sumeet,
>>>>>>>>>
>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is
>>>>>>>>> default setting) as the start offset to read, which is 29. Do you 
>>>>>>>>> have more
>>>>>>>>> than 29 records to read at that point? If the pipeline is only for 
>>>>>>>>> testing
>>>>>>>>> purpose, I would recommend reading from earliest offset to see 
>>>>>>>>> whether you
>>>>>>>>> get records. You can do so by constructing your ReadFromKafka like:
>>>>>>>>> ReadFromKafka(
>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>             topics=['test'])
>>>>>>>>>
>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka topic
>>>>>>>>>> into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>
>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>     _ = (
>>>>>>>>>>         pipeline
>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>             topics=['test'])
>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>
>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages
>>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm 
>>>>>>>>>> able to
>>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and
>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that front.
>>>>>>>>>>
>>>>>>>>>> It appears that Beam is able to connect to Kafka and get notified
>>>>>>>>>> of new messages, as I see the offset changes in the Beam logs as I 
>>>>>>>>>> publish
>>>>>>>>>> data from `kafkacat`:
>>>>>>>>>>
>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>> timestamp {
>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>   nanos: 534000000
>>>>>>>>>> }
>>>>>>>>>> message: "[Consumer
>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST 
>>>>>>>>>> offset
>>>>>>>>>> of partition test-0"
>>>>>>>>>> log_location:
>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>> thread: "22"
>>>>>>>>>>
>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>> timestamp {
>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>   nanos: 537000000
>>>>>>>>>> }
>>>>>>>>>> message: "[Consumer
>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset 
>>>>>>>>>> for
>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>> log_location:
>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>> thread: "22"
>>>>>>>>>>
>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>
>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>> 1:foo
>>>>>>>>>> 1:bar
>>>>>>>>>>
>>>>>>>>>> and I can confirm that its being received, again using `kafkacat`:
>>>>>>>>>>
>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>
>>>>>>>>>> But despite this, I don't see the actual message being printed by
>>>>>>>>>> Beam as I expected. Any pointers to what's missing here are 
>>>>>>>>>> appreciated.
>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline 
>>>>>>>>>> side,
>>>>>>>>>> but could be incorrect.
>>>>>>>>>>
>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Sumeet
>>>>>>>>>>
>>>>>>>>>

Reply via email to