And one more question, did you launch your pipeline with
streaming=True pipeline
options? I think you need to use --streaming=True to have unbounded source
working properly.

On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote:

> Hi Sumeet,
>
> Which Beam version are you using for your pipeline?
>
> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> I don't believe Fn API DirectRunner supports streaming yet (I might be
>> wrong). I can confirm that this works for Dataflow.
>>
>> Thanks,
>> Cham
>>
>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
>> [email protected]> wrote:
>>
>>> Thanks Cham! But I don't think this is Flink specific. I have observed
>>> similar behaviour with DirectRunner as well BTW.
>>>
>>> ..Sumeet
>>>
>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> I'm not too familiar with Flink but it seems like, for streaming
>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>>>> steps for some reason.
>>>> * X-lang Bounded read with Flink seems to be fine.
>>>> * X-lang Kafka sink and with Flink to be fine.
>>>>
>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>
>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Cham,
>>>>>
>>>>> Do you have pointers on what might be going on? Or something else I
>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm not
>>>>> the only one seeing this issue at the moment.
>>>>>
>>>>> Thanks,
>>>>> Sumeet
>>>>>
>>>>> [1]
>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>>
>>>>>
>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Took me some time to setup the Java test (using Java after more than
>>>>>> a decade!), but yes a similar pipeline with KafkaIO and Flink seems to 
>>>>>> work
>>>>>> fine.
>>>>>>
>>>>>> Here's the relevant Java code. The only difference from the Python
>>>>>> version is that I had to extract the KV from the KafkaRecord object and
>>>>>> construct a PCollection<KV> explicitly before writing to the output 
>>>>>> topic.
>>>>>>
>>>>>> ~~~~~~~~
>>>>>> package org.apache.beam.kafka.test;
>>>>>>
>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>>> import org.apache.beam.sdk.options.Default;
>>>>>> import org.apache.beam.sdk.options.Description;
>>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>> import org.apache.beam.sdk.transforms.*;
>>>>>> import org.apache.beam.sdk.values.KV;
>>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>>
>>>>>> public class KafkaTest {
>>>>>>
>>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>>> Default bootstrap kafka servers
>>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input
>>>>>> kafka topic name
>>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default output
>>>>>> kafka topic name
>>>>>>
>>>>>>   /** Specific pipeline options. */
>>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>>     @Description("Kafka bootstrap servers")
>>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>>     String getBootstrap();
>>>>>>
>>>>>>     void setBootstrap(String value);
>>>>>>
>>>>>>     @Description("Kafka input topic name")
>>>>>>     @Default.String(INPUT_TOPIC)
>>>>>>     String getInputTopic();
>>>>>>
>>>>>>     void setInputTopic(String value);
>>>>>>
>>>>>>     @Description("Kafka output topic name")
>>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>>     String getOutputTopic();
>>>>>>
>>>>>>     void setOutputTopic(String value);
>>>>>>   }
>>>>>>
>>>>>>   public static final void main(String[] args) throws Exception {
>>>>>>     final KafkaTestOptions options =
>>>>>>
>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>>
>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>     pipeline
>>>>>>         .apply(
>>>>>>             "ReadFromKafka",
>>>>>>             KafkaIO.<String, String>read()
>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>                 .withTopic(options.getInputTopic())
>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>         .apply(
>>>>>>             "PrepareForWriting",
>>>>>>             ParDo.of(
>>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>>> String>>() {
>>>>>>                   @ProcessElement
>>>>>>                   public void processElement(ProcessContext c) throws
>>>>>> Exception {
>>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>>> c.element().getKV().getValue()));
>>>>>>                   }
>>>>>>                 }))
>>>>>>         .apply(
>>>>>>             "WriteToKafka",
>>>>>>             KafkaIO.<String, String>write()
>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>                 .withTopic(options.getOutputTopic())
>>>>>>
>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>>
>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>>
>>>>>>     pipeline.run();
>>>>>>   }
>>>>>> }
>>>>>> ~~~~~~~~~
>>>>>>
>>>>>> I'm firing the Java version as follows:
>>>>>>
>>>>>> $ mvn exec:java
>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>>> -Pflink-runner
>>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>>
>>>>>> And I can see in real time, that as I publish records to the
>>>>>> in_topic, the out_topic is able to receive them on a continuous basis.
>>>>>>
>>>>>> I hope this helps narrow down the issue.
>>>>>>
>>>>>> Thanks,
>>>>>> Sumeet
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO
>>>>>>> and Flink ? (without x-lang)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Cham!
>>>>>>>>
>>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>>> follows:
>>>>>>>>
>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>     _ = (
>>>>>>>>         pipeline
>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>             consumer_config={
>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>>             topics=[in_topic],
>>>>>>>>             max_num_records=3)
>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>             producer_config={
>>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>>             topic=out_topic))
>>>>>>>>
>>>>>>>> I was able to see all 3 records being read, and written
>>>>>>>> successfully to the out_topic as well. So, it appears that there might 
>>>>>>>> be
>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any
>>>>>>>> setting that I might be missing?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sumeet
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hey Cham!
>>>>>>>>>
>>>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>>>> below), but I still don't see any data being consumed or written back 
>>>>>>>>> to
>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>>>> details/context that might help narrow down the issue. Apologies for 
>>>>>>>>> being
>>>>>>>>> a bit verbose from hereon!
>>>>>>>>>
>>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>>
>>>>>>>>> ~~~~~~
>>>>>>>>> import apache_beam as beam
>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>>
>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>>
>>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>>
>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>     _ = (
>>>>>>>>>         pipeline
>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>             consumer_config={
>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>>             },
>>>>>>>>>             topics=[in_topic])
>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>             producer_config={
>>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>>             },
>>>>>>>>>             topic=out_topic))
>>>>>>>>>
>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>>   import argparse
>>>>>>>>>
>>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>>   parser.add_argument(
>>>>>>>>>       '--bootstrap_servers',
>>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>>       required=True,
>>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>>   parser.add_argument(
>>>>>>>>>       '--in_topic',
>>>>>>>>>       dest='in_topic',
>>>>>>>>>       required=True,
>>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>>   parser.add_argument(
>>>>>>>>>       '--out_topic',
>>>>>>>>>       dest='out_topic',
>>>>>>>>>       required=True,
>>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>>
>>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>>> ~~~~~
>>>>>>>>>
>>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>>
>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>>
>>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>>
>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>>> v1
>>>>>>>>> v2
>>>>>>>>> v3
>>>>>>>>>
>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without
>>>>>>>>> processing the records. I don't see any data written to out_topic. I
>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>>
>>>>>>>>> INFO:root:Starting data pipeline.
>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic 
>>>>>>>>> out_topic=out_topic
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>>> in_topic-0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading
>>>>>>>>> from in_topic-0 starting at offset 0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>>> partition(s): in_topic-0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST 
>>>>>>>>> offset
>>>>>>>>> of partition in_topic-0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>>
>>>>>>>>> Additionally, the logs also emit complete consumer and producer
>>>>>>>>> configs. I'm dumping them here, in case that helps:
>>>>>>>>>
>>>>>>>>> Consumer Config:
>>>>>>>>>
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>>>>>> values:'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>>> = true'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>>>>>>>> earliest'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>>> [localhost:29092]'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>>> default'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tdefault.api.timeout.ms = 60000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit =
>>>>>>>>> false'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>>> = true'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>>>> 52428800'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms =
>>>>>>>>> 500'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms
>>>>>>>>> = 3000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>>>> []'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>>> = true'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>>>> read_uncommitted'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer =
>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>>> = 1048576'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms
>>>>>>>>> = 300000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records =
>>>>>>>>> 500'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms
>>>>>>>>> = 300000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples =
>>>>>>>>> 2'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>> = INFO'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>> = 65536'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>>> = 50'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>>>> 30000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>>> 100'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>> = 60000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>> = 0.05'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>> = 0.8'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>> = 300'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>> = 60'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>> = 0.8'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>> = 0.05'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>> GSSAPI'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>>> PLAINTEXT'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>>> 131072'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms =
>>>>>>>>> 10000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>> = https'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>> = SunX509'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>>> JKS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>> = PKIX'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>>>> JKS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer =
>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>
>>>>>>>>> Producer Config:
>>>>>>>>>
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>>>>>> values:'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>>> [localhost:29092]'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>>> 33554432'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>>> default'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type =
>>>>>>>>> none'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms
>>>>>>>>> = 120000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence =
>>>>>>>>> false'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes =
>>>>>>>>> []'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer =
>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>>> = 5'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>>>>>> 1048576'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms
>>>>>>>>> = 300000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples =
>>>>>>>>> 2'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>> = INFO'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class =
>>>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>> = 32768'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>>> = 50'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>>>>>> 30000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>>> 100'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>> = 60000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>> = 0.05'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>> = 0.8'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>> = 300'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>> = 60'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>> = 0.8'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>> = 0.05'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>> GSSAPI'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>>> PLAINTEXT'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>>> 131072'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>> = https'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>> = SunX509'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>>> JKS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>> = PKIX'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>> = null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type =
>>>>>>>>> JKS'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>> ttransaction.timeout.ms = 60000'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id =
>>>>>>>>> null'
>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer =
>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Apologies again for dumping almost everything here :-) Any
>>>>>>>>> pointers on what might be the issue are appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sumeet
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>>> distributed system like GCS) instead of just printing them ? (given 
>>>>>>>>>> that
>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might  not 
>>>>>>>>>> see
>>>>>>>>>> prints in the  original console I think).
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Cham
>>>>>>>>>>
>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>>
>>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which
>>>>>>>>>>> is default setting) as the start offset to read, which is 29. Do 
>>>>>>>>>>> you have
>>>>>>>>>>> more than 29 records to read at that point? If the pipeline is only 
>>>>>>>>>>> for
>>>>>>>>>>> testing purpose, I would recommend reading from earliest offset to 
>>>>>>>>>>> see
>>>>>>>>>>> whether you get records. You can do so by constructing your 
>>>>>>>>>>> ReadFromKafka
>>>>>>>>>>> like:
>>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka
>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>>
>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>>
>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages
>>>>>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm 
>>>>>>>>>>>> able to
>>>>>>>>>>>> use `kafkacat` from the host (outside the container) to publish and
>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that 
>>>>>>>>>>>> front.
>>>>>>>>>>>>
>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam 
>>>>>>>>>>>> logs as I
>>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>>
>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>>> }
>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to 
>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>> of partition test-0"
>>>>>>>>>>>> log_location:
>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>
>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>>> }
>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset 
>>>>>>>>>>>> for
>>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>>> log_location:
>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>
>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>>
>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>>> 1:foo
>>>>>>>>>>>> 1:bar
>>>>>>>>>>>>
>>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>>
>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value:
>>>>>>>>>>>> %s\n'
>>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>>
>>>>>>>>>>>> But despite this, I don't see the actual message being printed
>>>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are 
>>>>>>>>>>>> appreciated.
>>>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline 
>>>>>>>>>>>> side,
>>>>>>>>>>>> but could be incorrect.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to