I'm not too familiar with Flink but it seems like, for streaming pipelines,
messages from Kafka/SDF read do not get pushed to subsequent steps for some
reason.
* X-lang Bounded read with Flink seems to be fine.
* X-lang Kafka sink and with Flink to be fine.

Created https://issues.apache.org/jira/browse/BEAM-11991 for tracking.

Thanks,
Cham



On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <[email protected]>
wrote:

> Hi Cham,
>
> Do you have pointers on what might be going on? Or something else I can
> try? I had posted the same on StackOverflow [1], it seems that I'm not the
> only one seeing this issue at the moment.
>
> Thanks,
> Sumeet
>
> [1]
> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>
>
> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
> [email protected]> wrote:
>
>> Took me some time to setup the Java test (using Java after more than a
>> decade!), but yes a similar pipeline with KafkaIO and Flink seems to work
>> fine.
>>
>> Here's the relevant Java code. The only difference from the Python
>> version is that I had to extract the KV from the KafkaRecord object and
>> construct a PCollection<KV> explicitly before writing to the output topic.
>>
>> ~~~~~~~~
>> package org.apache.beam.kafka.test;
>>
>> import org.apache.beam.sdk.Pipeline;
>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>> import org.apache.beam.sdk.options.Default;
>> import org.apache.beam.sdk.options.Description;
>> import org.apache.beam.sdk.options.PipelineOptions;
>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>> import org.apache.beam.sdk.transforms.*;
>> import org.apache.beam.sdk.values.KV;
>> import org.apache.beam.sdk.values.PCollection;
>> import org.apache.kafka.common.serialization.StringDeserializer;
>>
>> public class KafkaTest {
>>
>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; // Default
>> bootstrap kafka servers
>>   static final String INPUT_TOPIC = "in_topic"; // Default input kafka
>> topic name
>>   static final String OUTPUT_TOPIC = "out_topic"; // Default output kafka
>> topic name
>>
>>   /** Specific pipeline options. */
>>   public interface KafkaTestOptions extends PipelineOptions {
>>     @Description("Kafka bootstrap servers")
>>     @Default.String(BOOTSTRAP_SERVERS)
>>     String getBootstrap();
>>
>>     void setBootstrap(String value);
>>
>>     @Description("Kafka input topic name")
>>     @Default.String(INPUT_TOPIC)
>>     String getInputTopic();
>>
>>     void setInputTopic(String value);
>>
>>     @Description("Kafka output topic name")
>>     @Default.String(OUTPUT_TOPIC)
>>     String getOutputTopic();
>>
>>     void setOutputTopic(String value);
>>   }
>>
>>   public static final void main(String[] args) throws Exception {
>>     final KafkaTestOptions options =
>>
>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>
>>     Pipeline pipeline = Pipeline.create(options);
>>     pipeline
>>         .apply(
>>             "ReadFromKafka",
>>             KafkaIO.<String, String>read()
>>                 .withBootstrapServers(options.getBootstrap())
>>                 .withTopic(options.getInputTopic())
>>                 .withKeyDeserializer(StringDeserializer.class)
>>                 .withValueDeserializer(StringDeserializer.class))
>>         .apply(
>>             "PrepareForWriting",
>>             ParDo.of(
>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>> String>>() {
>>                   @ProcessElement
>>                   public void processElement(ProcessContext c) throws
>> Exception {
>>                     c.output(KV.of(c.element().getKV().getKey(),
>> c.element().getKV().getValue()));
>>                   }
>>                 }))
>>         .apply(
>>             "WriteToKafka",
>>             KafkaIO.<String, String>write()
>>                 .withBootstrapServers(options.getBootstrap())
>>                 .withTopic(options.getOutputTopic())
>>
>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>
>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>
>>     pipeline.run();
>>   }
>> }
>> ~~~~~~~~~
>>
>> I'm firing the Java version as follows:
>>
>> $ mvn exec:java
>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest -Pflink-runner
>> -Dexec.args="--runner=FlinkRunner"
>>
>> And I can see in real time, that as I publish records to the in_topic,
>> the out_topic is able to receive them on a continuous basis.
>>
>> I hope this helps narrow down the issue.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Are you able to run a similar Java streaming pipeline using KafkaIO and
>>> Flink ? (without x-lang)
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>> [email protected]> wrote:
>>>
>>>> Hi Cham!
>>>>
>>>> So finally I was able to get partial success. Since I had pre-populated
>>>> the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see
>>>> if it can read all existing records, as follows:
>>>>
>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>     _ = (
>>>>         pipeline
>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>             consumer_config={
>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>                 'auto.offset.reset': 'earliest'},
>>>>             topics=[in_topic],
>>>>             max_num_records=3)
>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>             producer_config={
>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>             topic=out_topic))
>>>>
>>>> I was able to see all 3 records being read, and written successfully to
>>>> the out_topic as well. So, it appears that there might be some issue with
>>>> reading unbounded Kafka streams here? Or is there any setting that I might
>>>> be missing?
>>>>
>>>> Thanks,
>>>> Sumeet
>>>>
>>>>
>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>> [email protected]> wrote:
>>>>
>>>>> Hey Cham!
>>>>>
>>>>> Appreciate the response. I tried out your suggestions (details below),
>>>>> but I still don't see any data being consumed or written back to Kafka (as
>>>>> per your suggestion). I'm also providing additional details/context that
>>>>> might help narrow down the issue. Apologies for being a bit verbose from
>>>>> hereon!
>>>>>
>>>>> First, here's what my pipeline code looks like now:
>>>>>
>>>>> ~~~~~~
>>>>> import apache_beam as beam
>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>
>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>> save_main_session=True, streaming=True)
>>>>>
>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>> in_topic=%s out_topic=%s',
>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>
>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>     _ = (
>>>>>         pipeline
>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>             consumer_config={
>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>                 'auto.offset.reset': 'earliest'
>>>>>             },
>>>>>             topics=[in_topic])
>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>             producer_config={
>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>             },
>>>>>             topic=out_topic))
>>>>>
>>>>> if __name__ == '__main__':
>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>   import argparse
>>>>>
>>>>>   parser = argparse.ArgumentParser()
>>>>>   parser.add_argument(
>>>>>       '--bootstrap_servers',
>>>>>       dest='bootstrap_servers',
>>>>>       required=True,
>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>   parser.add_argument(
>>>>>       '--in_topic',
>>>>>       dest='in_topic',
>>>>>       required=True,
>>>>>       help='Kafka topic to read data from')
>>>>>   parser.add_argument(
>>>>>       '--out_topic',
>>>>>       dest='out_topic',
>>>>>       required=True,
>>>>>       help='Kafka topic to write data to')
>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>
>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>> known_args.out_topic, pipeline_args)
>>>>> ~~~~~
>>>>>
>>>>> I'm firing this pipeline as follows:
>>>>>
>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>
>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>
>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>> v1
>>>>> v2
>>>>> v3
>>>>>
>>>>> Now, when I execute the pipeline, I see that it starts to read records
>>>>> from offset 0, but then seeks to the latest offset 3 without processing 
>>>>> the
>>>>> records. I don't see any data written to out_topic. I filtered out the 
>>>>> logs
>>>>> a bit, and this is what I'm seeing:
>>>>>
>>>>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
>>>>> in_topic=in_topic out_topic=out_topic
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned
>>>>> to split 0 (total 1): in_topic-0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>> in_topic-0 to offset 0.'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading
>>>>> from in_topic-0 starting at offset 0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>> partition(s): in_topic-0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset
>>>>> of partition in_topic-0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>>>>> partition in_topic-0 to offset 3.'
>>>>>
>>>>> Additionally, the logs also emit complete consumer and producer
>>>>> configs. I'm dumping them here, in case that helps:
>>>>>
>>>>> Consumer Config:
>>>>>
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>> values:'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>> = true'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms
>>>>> = 5000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset =
>>>>> earliest'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>> [localhost:29092]'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>> default'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms
>>>>> = 540000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms =
>>>>> 60000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit =
>>>>> false'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics =
>>>>> true'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>> 52428800'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms =
>>>>> 3000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>> = true'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>> read_uncommitted'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class
>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>> = 1048576'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms =
>>>>> 300000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>> 300000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>>>>> INFO'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms
>>>>> = 30000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>> 65536'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms
>>>>> = 1000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms =
>>>>> 50'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>> 30000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>>>>> /usr/bin/kinit'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>> = 60000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>> tsasl.kerberos.service.name = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>> = 0.05'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>> = 0.8'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>> = 300'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>> = 60'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>> = 0.8'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>> = 0.05'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>> PLAINTEXT'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>> 131072'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms =
>>>>> 10000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>> = https'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>> = SunX509'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>> = PKIX'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer =
>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>
>>>>> Producer Config:
>>>>>
>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>> values:'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>>>>> [localhost:29092]'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup =
>>>>> default'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms
>>>>> = 540000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms =
>>>>> 120000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence =
>>>>> false'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
>>>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>> = 5'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>> 1048576'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>>>>> 300000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>>>>> INFO'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms
>>>>> = 30000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class
>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes =
>>>>> 32768'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms
>>>>> = 1000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms =
>>>>> 50'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms =
>>>>> 30000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>>>>> /usr/bin/kinit'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>> = 60000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>> tsasl.kerberos.service.name = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>> = 0.05'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>> = 0.8'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>> = 300'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>> = 60'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>> = 0.8'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>> = 0.05'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>>>>> PLAINTEXT'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes =
>>>>> 131072'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>>>>> [TLSv1.2, TLSv1.1, TLSv1]'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>> = https'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>> = SunX509'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>> = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>> = PKIX'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>>>>> null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms =
>>>>> 60000'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null'
>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class
>>>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>
>>>>>
>>>>> Apologies again for dumping almost everything here :-) Any pointers on
>>>>> what might be the issue are appreciated.
>>>>>
>>>>> Thanks,
>>>>> Sumeet
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>> distributed system like GCS) instead of just printing them ? (given that
>>>>>> multi-language pipelines run SDK containers in Docker you might  not see
>>>>>> prints in the  original console I think).
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Sumeet,
>>>>>>>
>>>>>>> It seems like your kafka consumer uses the LATEST offset(which is
>>>>>>> default setting) as the start offset to read, which is 29. Do you have 
>>>>>>> more
>>>>>>> than 29 records to read at that point? If the pipeline is only for 
>>>>>>> testing
>>>>>>> purpose, I would recommend reading from earliest offset to see whether 
>>>>>>> you
>>>>>>> get records. You can do so by constructing your ReadFromKafka like:
>>>>>>> ReadFromKafka(
>>>>>>>             consumer_config={'bootstrap.servers': 'localhost:29092',
>>>>>>> 'auto.offset.reset':'earliest'},
>>>>>>>             topics=['test'])
>>>>>>>
>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I'm trying out a simple example of reading data off a Kafka topic
>>>>>>>> into Apache Beam. Here's the relevant snippet:
>>>>>>>>
>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>     _ = (
>>>>>>>>         pipeline
>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>> 'localhost:29092'},
>>>>>>>>             topics=['test'])
>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>
>>>>>>>> Using the above Beam pipeline snippet, I don't see any messages
>>>>>>>> coming in. Kafka is running locally in a docker container, and I'm 
>>>>>>>> able to
>>>>>>>> use `kafkacat` from the host (outside the container) to publish and
>>>>>>>> subscribe to messages. So, I guess there are no issues on that front.
>>>>>>>>
>>>>>>>> It appears that Beam is able to connect to Kafka and get notified
>>>>>>>> of new messages, as I see the offset changes in the Beam logs as I 
>>>>>>>> publish
>>>>>>>> data from `kafkacat`:
>>>>>>>>
>>>>>>>> INFO:root:severity: INFO
>>>>>>>> timestamp {
>>>>>>>>   seconds: 1612886861
>>>>>>>>   nanos: 534000000
>>>>>>>> }
>>>>>>>> message: "[Consumer
>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST 
>>>>>>>> offset
>>>>>>>> of partition test-0"
>>>>>>>> log_location:
>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>> thread: "22"
>>>>>>>>
>>>>>>>> INFO:root:severity: INFO
>>>>>>>> timestamp {
>>>>>>>>   seconds: 1612886861
>>>>>>>>   nanos: 537000000
>>>>>>>> }
>>>>>>>> message: "[Consumer
>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
>>>>>>>> partition test-0 to offset 29."
>>>>>>>> log_location:
>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>> thread: "22"
>>>>>>>>
>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>
>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>> 1:foo
>>>>>>>> 1:bar
>>>>>>>>
>>>>>>>> and I can confirm that its being received, again using `kafkacat`:
>>>>>>>>
>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>>>>>>> Key: 1 Value: foo
>>>>>>>> Key: 1 Value: bar
>>>>>>>>
>>>>>>>> But despite this, I don't see the actual message being printed by
>>>>>>>> Beam as I expected. Any pointers to what's missing here are 
>>>>>>>> appreciated.
>>>>>>>> I'm suspecting this could be a decoding issue on the Beam pipeline 
>>>>>>>> side,
>>>>>>>> but could be incorrect.
>>>>>>>>
>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Sumeet
>>>>>>>>
>>>>>>>

Reply via email to