Hi Sumeet,

After double checking the current support status. the root cause is that
when you are using cross-language pipelines, you are actually having
pipelines running in the portable way[1]. Currently we haven't supported
processing unbounded source on Flink over portable execution well. I have
filed https://issues.apache.org/jira/browse/BEAM-11998 to track the
progress.

[1] https://s.apache.org/beam-fn-api


On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> wrote:

> And one more question, did you launch your pipeline with streaming=True 
> pipeline
> options? I think you need to use --streaming=True to have unbounded
> source working properly.
>
> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Sumeet,
>>
>> Which Beam version are you using for your pipeline?
>>
>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> I don't believe Fn API DirectRunner supports streaming yet (I might be
>>> wrong). I can confirm that this works for Dataflow.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
>>> [email protected]> wrote:
>>>
>>>> Thanks Cham! But I don't think this is Flink specific. I have observed
>>>> similar behaviour with DirectRunner as well BTW.
>>>>
>>>> ..Sumeet
>>>>
>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>> I'm not too familiar with Flink but it seems like, for streaming
>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>>>>> steps for some reason.
>>>>> * X-lang Bounded read with Flink seems to be fine.
>>>>> * X-lang Kafka sink and with Flink to be fine.
>>>>>
>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Cham,
>>>>>>
>>>>>> Do you have pointers on what might be going on? Or something else I
>>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm 
>>>>>> not
>>>>>> the only one seeing this issue at the moment.
>>>>>>
>>>>>> Thanks,
>>>>>> Sumeet
>>>>>>
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Took me some time to setup the Java test (using Java after more than
>>>>>>> a decade!), but yes a similar pipeline with KafkaIO and Flink seems to 
>>>>>>> work
>>>>>>> fine.
>>>>>>>
>>>>>>> Here's the relevant Java code. The only difference from the Python
>>>>>>> version is that I had to extract the KV from the KafkaRecord object and
>>>>>>> construct a PCollection<KV> explicitly before writing to the output 
>>>>>>> topic.
>>>>>>>
>>>>>>> ~~~~~~~~
>>>>>>> package org.apache.beam.kafka.test;
>>>>>>>
>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>>>> import org.apache.beam.sdk.options.Default;
>>>>>>> import org.apache.beam.sdk.options.Description;
>>>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>> import org.apache.beam.sdk.transforms.*;
>>>>>>> import org.apache.beam.sdk.values.KV;
>>>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>>>
>>>>>>> public class KafkaTest {
>>>>>>>
>>>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>>>> Default bootstrap kafka servers
>>>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input
>>>>>>> kafka topic name
>>>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default output
>>>>>>> kafka topic name
>>>>>>>
>>>>>>>   /** Specific pipeline options. */
>>>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>>>     @Description("Kafka bootstrap servers")
>>>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>>>     String getBootstrap();
>>>>>>>
>>>>>>>     void setBootstrap(String value);
>>>>>>>
>>>>>>>     @Description("Kafka input topic name")
>>>>>>>     @Default.String(INPUT_TOPIC)
>>>>>>>     String getInputTopic();
>>>>>>>
>>>>>>>     void setInputTopic(String value);
>>>>>>>
>>>>>>>     @Description("Kafka output topic name")
>>>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>>>     String getOutputTopic();
>>>>>>>
>>>>>>>     void setOutputTopic(String value);
>>>>>>>   }
>>>>>>>
>>>>>>>   public static final void main(String[] args) throws Exception {
>>>>>>>     final KafkaTestOptions options =
>>>>>>>
>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>>>
>>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>>     pipeline
>>>>>>>         .apply(
>>>>>>>             "ReadFromKafka",
>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>                 .withTopic(options.getInputTopic())
>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>         .apply(
>>>>>>>             "PrepareForWriting",
>>>>>>>             ParDo.of(
>>>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>>>> String>>() {
>>>>>>>                   @ProcessElement
>>>>>>>                   public void processElement(ProcessContext c)
>>>>>>> throws Exception {
>>>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>>>> c.element().getKV().getValue()));
>>>>>>>                   }
>>>>>>>                 }))
>>>>>>>         .apply(
>>>>>>>             "WriteToKafka",
>>>>>>>             KafkaIO.<String, String>write()
>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>                 .withTopic(options.getOutputTopic())
>>>>>>>
>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>>>
>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>>>
>>>>>>>     pipeline.run();
>>>>>>>   }
>>>>>>> }
>>>>>>> ~~~~~~~~~
>>>>>>>
>>>>>>> I'm firing the Java version as follows:
>>>>>>>
>>>>>>> $ mvn exec:java
>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>>>> -Pflink-runner
>>>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>>>
>>>>>>> And I can see in real time, that as I publish records to the
>>>>>>> in_topic, the out_topic is able to receive them on a continuous basis.
>>>>>>>
>>>>>>> I hope this helps narrow down the issue.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sumeet
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Are you able to run a similar Java streaming pipeline using KafkaIO
>>>>>>>> and Flink ? (without x-lang)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Cham!
>>>>>>>>>
>>>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>>>> follows:
>>>>>>>>>
>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>     _ = (
>>>>>>>>>         pipeline
>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>             consumer_config={
>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>>>             topics=[in_topic],
>>>>>>>>>             max_num_records=3)
>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>             producer_config={
>>>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>>>             topic=out_topic))
>>>>>>>>>
>>>>>>>>> I was able to see all 3 records being read, and written
>>>>>>>>> successfully to the out_topic as well. So, it appears that there 
>>>>>>>>> might be
>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there any
>>>>>>>>> setting that I might be missing?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sumeet
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Cham!
>>>>>>>>>>
>>>>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>>>>> below), but I still don't see any data being consumed or written 
>>>>>>>>>> back to
>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>>>>> details/context that might help narrow down the issue. Apologies for 
>>>>>>>>>> being
>>>>>>>>>> a bit verbose from hereon!
>>>>>>>>>>
>>>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>>>
>>>>>>>>>> ~~~~~~
>>>>>>>>>> import apache_beam as beam
>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>>>
>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>>>
>>>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>>>
>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>     _ = (
>>>>>>>>>>         pipeline
>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>             consumer_config={
>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>>>             },
>>>>>>>>>>             topics=[in_topic])
>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>             producer_config={
>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>>>             },
>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>
>>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>>>   import argparse
>>>>>>>>>>
>>>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>       '--bootstrap_servers',
>>>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>>>       required=True,
>>>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>       '--in_topic',
>>>>>>>>>>       dest='in_topic',
>>>>>>>>>>       required=True,
>>>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>       '--out_topic',
>>>>>>>>>>       dest='out_topic',
>>>>>>>>>>       required=True,
>>>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>>>
>>>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>>>> ~~~~~
>>>>>>>>>>
>>>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>>>
>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>>>
>>>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>>>
>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>>>> v1
>>>>>>>>>> v2
>>>>>>>>>> v3
>>>>>>>>>>
>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 without
>>>>>>>>>> processing the records. I don't see any data written to out_topic. I
>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>>>
>>>>>>>>>> INFO:root:Starting data pipeline.
>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic 
>>>>>>>>>> out_topic=out_topic
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>>>> in_topic-0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0:
>>>>>>>>>> reading from in_topic-0 starting at offset 0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>>>> partition(s): in_topic-0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST 
>>>>>>>>>> offset
>>>>>>>>>> of partition in_topic-0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset 
>>>>>>>>>> for
>>>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>>>
>>>>>>>>>> Additionally, the logs also emit complete consumer and producer
>>>>>>>>>> configs. I'm dumping them here, in case that helps:
>>>>>>>>>>
>>>>>>>>>> Consumer Config:
>>>>>>>>>>
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>>>>>>> values:'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>>>> = true'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>>>>>>>>> earliest'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>>>> [localhost:29092]'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>>>> default'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tdefault.api.timeout.ms = 60000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit =
>>>>>>>>>> false'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>>>> = true'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>>>>> 52428800'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms =
>>>>>>>>>> 500'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> theartbeat.interval.ms = 3000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>> = []'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>>>> = true'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>>>>> read_uncommitted'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer =
>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>>>> = 1048576'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms
>>>>>>>>>> = 300000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records =
>>>>>>>>>> 500'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms
>>>>>>>>>> = 300000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters =
>>>>>>>>>> []'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>> = 2'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>> = INFO'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>> = 65536'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>>>> = 50'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms
>>>>>>>>>> = 30000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>>>> 100'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>> = 60000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>> = 0.05'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>> = 0.8'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>> = 300'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>> = 60'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>> = 0.8'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>> = 0.05'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>> GSSAPI'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>>>> PLAINTEXT'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>>>> 131072'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms
>>>>>>>>>> = 10000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>> = https'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>> = SunX509'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>>>> JKS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>> = PKIX'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>> = JKS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer =
>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>
>>>>>>>>>> Producer Config:
>>>>>>>>>>
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>>>>>>> values:'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>>>>>>> [localhost:29092]'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>>>> 33554432'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>>>>>>> default'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type =
>>>>>>>>>> none'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms
>>>>>>>>>> = 120000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence =
>>>>>>>>>> false'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>> = []'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer =
>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms =
>>>>>>>>>> 60000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>>>> = 5'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>>>>>>> 1048576'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms
>>>>>>>>>> = 300000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters =
>>>>>>>>>> []'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>> = 2'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>> = INFO'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class =
>>>>>>>>>> class org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>> = 32768'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms
>>>>>>>>>> = 50'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms
>>>>>>>>>> = 30000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms =
>>>>>>>>>> 100'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>> = 60000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>> = 0.05'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>> = 0.8'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>> = 300'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>> = 60'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>> = 0.8'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>> = 0.05'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>> GSSAPI'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>>>>>>> PLAINTEXT'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>>>>>>> 131072'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>> = https'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>> = SunX509'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type =
>>>>>>>>>> JKS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>> = PKIX'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>> = null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>> = JKS'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>> ttransaction.timeout.ms = 60000'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id =
>>>>>>>>>> null'
>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer =
>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Apologies again for dumping almost everything here :-) Any
>>>>>>>>>> pointers on what might be the issue are appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Sumeet
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>>>> distributed system like GCS) instead of just printing them ? (given 
>>>>>>>>>>> that
>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might  
>>>>>>>>>>> not see
>>>>>>>>>>> prints in the  original console I think).
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Cham
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>>>
>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST offset(which
>>>>>>>>>>>> is default setting) as the start offset to read, which is 29. Do 
>>>>>>>>>>>> you have
>>>>>>>>>>>> more than 29 records to read at that point? If the pipeline is 
>>>>>>>>>>>> only for
>>>>>>>>>>>> testing purpose, I would recommend reading from earliest offset to 
>>>>>>>>>>>> see
>>>>>>>>>>>> whether you get records. You can do so by constructing your 
>>>>>>>>>>>> ReadFromKafka
>>>>>>>>>>>> like:
>>>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka
>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>>>
>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>>>
>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any
>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker 
>>>>>>>>>>>>> container, and I'm
>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) to 
>>>>>>>>>>>>> publish and
>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that 
>>>>>>>>>>>>> front.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the Beam 
>>>>>>>>>>>>> logs as I
>>>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>>>
>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>>>> }
>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to 
>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>> of partition test-0"
>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>
>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>>>> }
>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting 
>>>>>>>>>>>>> offset for
>>>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>>>
>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>>>> 1:foo
>>>>>>>>>>>>> 1:bar
>>>>>>>>>>>>>
>>>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>>>
>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value:
>>>>>>>>>>>>> %s\n'
>>>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>>>
>>>>>>>>>>>>> But despite this, I don't see the actual message being printed
>>>>>>>>>>>>> by Beam as I expected. Any pointers to what's missing here are 
>>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>> I'm suspecting this could be a decoding issue on the Beam 
>>>>>>>>>>>>> pipeline side,
>>>>>>>>>>>>> but could be incorrect.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to