Hi Sumeet,

Which Beam version are you using for your pipeline?

On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]>
wrote:

> I don't believe Fn API DirectRunner supports streaming yet (I might be
> wrong). I can confirm that this works for Dataflow.
>
> Thanks,
> Cham
>
> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
> [email protected]> wrote:
>
>> Thanks Cham! But I don't think this is Flink specific. I have observed
>> similar behaviour with DirectRunner as well BTW.
>>
>> ..Sumeet
>>
>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> I'm not too familiar with Flink but it seems like, for streaming
>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>>> steps for some reason.
>>> * X-lang Bounded read with Flink seems to be fine.
>>> * X-lang Kafka sink and with Flink to be fine.
>>>
>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>> [email protected]> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> Do you have pointers on what might be going on? Or something else I can
>>>> try? I had posted the same on StackOverflow [1], it seems that I'm not the
>>>> only one seeing this issue at the moment.
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>> [1]
>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>
>>>>
>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>> [email protected]> wrote:
>>>>
>>>>> Took me some time to setup the Java test (using Java after more than a
>>>>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work
>>>>> fine.
>>>>>
>>>>> Here's the relevant Java code. The only difference from the Python
>>>>> version is that I had to extract the KV from the KafkaRecord object and
>>>>> construct a PCollection<KV> explicitly before writing to the output topic.
>>>>>
>>>>> ~~~~~~~~
>>>>> package org.apache.beam.kafka.test;
>>>>>
>>>>> import org.apache.beam.sdk.Pipeline;
>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>> import org.apache.beam.sdk.options.Default;
>>>>> import org.apache.beam.sdk.options.Description;
>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>> import org.apache.beam.sdk.transforms.*;
>>>>> import org.apache.beam.sdk.values.KV;
>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>
>>>>> public class KafkaTest {
>>>>>
>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>> Default bootstrap kafka servers
>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input kafka
>>>>> topic name
>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default output
>>>>> kafka topic name
>>>>>
>>>>>   /** Specific pipeline options. */
>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>     @Description("Kafka bootstrap servers")
>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>     String getBootstrap();
>>>>>
>>>>>     void setBootstrap(String value);
>>>>>
>>>>>     @Description("Kafka input topic name")
>>>>>     @Default.String(INPUT_TOPIC)
>>>>>     String getInputTopic();
>>>>>
>>>>>     void setInputTopic(String value);
>>>>>
>>>>>     @Description("Kafka output topic name")
>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>     String getOutputTopic();
>>>>>
>>>>>     void setOutputTopic(String value);
>>>>>   }
>>>>>
>>>>>   public static final void main(String[] args) throws Exception {
>>>>>     final KafkaTestOptions options =
>>>>>
>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>
>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>     pipeline
>>>>>         .apply(
>>>>>             "ReadFromKafka",
>>>>>             KafkaIO.<String, String>read()
>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>                 .withTopic(options.getInputTopic())
>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>         .apply(
>>>>>             "PrepareForWriting",
>>>>>             ParDo.of(
>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>> String>>() {
>>>>>                   @ProcessElement
>>>>>                   public void processElement(ProcessContext c) throws
>>>>> Exception {
>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>> c.element().getKV().getValue()));
>>>>>                   }
>>>>>                 }))
>>>>>         .apply(
>>>>>             "WriteToKafka",
>>>>>             KafkaIO.<String, String>write()
>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>                 .withTopic(options.getOutputTopic())
>>>>>
>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>
>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>
>>>>>     pipeline.run();
>>>>>   }
>>>>> }
>>>>> ~~~~~~~~~
>>>>>
>>>>> I'm firing the Java version as follows:
>>>>>
>>>>> $ mvn exec:java
>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>> -Pflink-runner
>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>
>>>>> And I can see in real time, that as I publish records to the in_topic,
>>>>> the out_topic is able to receive them on a continuous basis.
>>>>>
>>>>> I hope this helps narrow down the issue.
>>>>>
>>>>> Thanks,
>>>>> Sumeet
>>>>>
>>>>>
>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO
>>>>>> and Flink ? (without x-lang)
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Cham!
>>>>>>>
>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>> follows:
>>>>>>>
>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>     _ = (
>>>>>>>         pipeline
>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>             consumer_config={
>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>             topics=[in_topic],
>>>>>>>             max_num_records=3)
>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>             producer_config={
>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>             topic=out_topic))
>>>>>>>
>>>>>>> I was able to see all 3 records being read, and written
>>>>>>> successfully to the out_topic as well. So, it appears that there might 
>>>>>>> be
>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any
>>>>>>> setting that I might be missing?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sumeet
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hey Cham!
>>>>>>>>
>>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>>> below), but I still don't see any data being consumed or written back 
>>>>>>>> to
>>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>>> details/context that might help narrow down the issue. Apologies for 
>>>>>>>> being
>>>>>>>> a bit verbose from hereon!
>>>>>>>>
>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>
>>>>>>>> ~~~~~~
>>>>>>>> import apache_beam as beam
>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>
>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>
>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>
>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>     _ = (
>>>>>>>>         pipeline
>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>             consumer_config={
>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>             },
>>>>>>>>             topics=[in_topic])
>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>             producer_config={
>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>             },
>>>>>>>>             topic=out_topic))
>>>>>>>>
>>>>>>>> if __name__ == '__main__':
>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>   import argparse
>>>>>>>>
>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>   parser.add_argument(
>>>>>>>>       '--bootstrap_servers',
>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>       required=True,
>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>   parser.add_argument(
>>>>>>>>       '--in_topic',
>>>>>>>>       dest='in_topic',
>>>>>>>>       required=True,
>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>   parser.add_argument(
>>>>>>>>       '--out_topic',
>>>>>>>>       dest='out_topic',
>>>>>>>>       required=True,
>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>
>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>> ~~~~~
>>>>>>>>
>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>
>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>
>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>
>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>> v1
>>>>>>>> v2
>>>>>>>> v3
>>>>>>>>
>>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without
>>>>>>>> processing the records. I don't see any data written to out_topic. I
>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>
>>>>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
>>>>>>>> in_topic=in_topic out_topic=out_topic
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>> in_topic-0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading
>>>>>>>> from in_topic-0 starting at offset 0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>> partition(s): in_topic-0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST 
>>>>>>>> offset
>>>>>>>> of partition in_topic-0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>
>>>>>>>> Additionally, the logs also emit complete consumer and producer
>>>>>>>> configs. I'm dumping them here, in case that helps:
>>>>>>>>
>>>>>>>> Consumer Config:
>>>>>>>>
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>>>>> values:'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>> = true'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>>>>>>> earliest'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>> [localhost:29092]'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>> default'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms
>>>>>>>> = 60000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit =
>>>>>>>> false'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>> = true'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>>> 52428800'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms =
>>>>>>>> 500'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms
>>>>>>>> = 3000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>>> []'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>> = true'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>>> read_uncommitted'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer =
>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>> = 1048576'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms
>>>>>>>> = 300000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>>>>> 300000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples =
>>>>>>>> 2'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>> = INFO'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>>>>> 65536'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>> = 50'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>>> 30000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>> 100'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>> = /usr/bin/kinit'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>> = 60000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>> = 0.05'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>> = 0.8'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>> = 300'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>> = 60'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>> = 0.8'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>> = 0.05'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>> GSSAPI'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>> PLAINTEXT'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>> 131072'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms =
>>>>>>>> 10000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>> = https'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>> = SunX509'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>> JKS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>> = PKIX'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>>> JKS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer =
>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>
>>>>>>>> Producer Config:
>>>>>>>>
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>>>>> values:'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>> [localhost:29092]'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>> 33554432'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>> default'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type =
>>>>>>>> none'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms =
>>>>>>>> 120000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence =
>>>>>>>> false'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>>> []'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
>>>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>> = 5'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>>>>> 1048576'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>>>>> 300000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples =
>>>>>>>> 2'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>> = INFO'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class =
>>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>>>>> 32768'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>> = 50'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>>> 30000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>> 100'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>> = /usr/bin/kinit'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>> = 60000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>> = 0.05'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>> = 0.8'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>> = 300'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>> = 60'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>> = 0.8'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>> = 0.05'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>> GSSAPI'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>> PLAINTEXT'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>> 131072'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>> = https'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>> = SunX509'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>> JKS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>> = PKIX'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>> = null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>>> JKS'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms
>>>>>>>> = 60000'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id =
>>>>>>>> null'
>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer =
>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>
>>>>>>>>
>>>>>>>> Apologies again for dumping almost everything here :-) Any pointers
>>>>>>>> on what might be the issue are appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sumeet
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>> distributed system like GCS) instead of just printing them ? (given 
>>>>>>>>> that
>>>>>>>>> multi-language pipelines run SDK containers in Docker you might  not 
>>>>>>>>> see
>>>>>>>>> prints in the  original console I think).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>
>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is
>>>>>>>>>> default setting) as the start offset to read, which is 29. Do you 
>>>>>>>>>> have more
>>>>>>>>>> than 29 records to read at that point? If the pipeline is only for 
>>>>>>>>>> testing
>>>>>>>>>> purpose, I would recommend reading from earliest offset to see 
>>>>>>>>>> whether you
>>>>>>>>>> get records. You can do so by constructing your ReadFromKafka like:
>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>             topics=['test'])
>>>>>>>>>>
>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka
>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>
>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>     _ = (
>>>>>>>>>>>         pipeline
>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>
>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages
>>>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm 
>>>>>>>>>>> able to
>>>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and
>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that 
>>>>>>>>>>> front.
>>>>>>>>>>>
>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam 
>>>>>>>>>>> logs as I
>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>
>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>> timestamp {
>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>> }
>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST 
>>>>>>>>>>> offset
>>>>>>>>>>> of partition test-0"
>>>>>>>>>>> log_location:
>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>> thread: "22"
>>>>>>>>>>>
>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>> timestamp {
>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>> }
>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset 
>>>>>>>>>>> for
>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>> log_location:
>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>> thread: "22"
>>>>>>>>>>>
>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>
>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>> 1:foo
>>>>>>>>>>> 1:bar
>>>>>>>>>>>
>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>
>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>
>>>>>>>>>>> But despite this, I don't see the actual message being printed
>>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are 
>>>>>>>>>>> appreciated.
>>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline 
>>>>>>>>>>> side,
>>>>>>>>>>> but could be incorrect.
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Sumeet
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to