IIUC,  currently Splittable DoFn (source framework) does not work for
portable runners in streaming mode due to the issue Boyuan mentioned.

On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra <[email protected]>
wrote:

> Thanks Cham. In the python version, I do specify the streaming option as
> follows (not on the command line though):
>
> pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,
> streaming=True)
>
> Regarding running portable pipelines, just to confirm, what you are saying
> is that currently the only way to execute this is in Java then until the
> issue you created is resolved?
>

Yes, I think Java worked since it did not use portable Spark/Flink but a
cross-language transform would require this.

Thanks,
Cham


>
> Thanks,
> Sumeet
>
>
> On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Sumeet,
>>
>> After double checking the current support status. the root cause is that
>> when you are using cross-language pipelines, you are actually having
>> pipelines running in the portable way[1]. Currently we haven't supported
>> processing unbounded source on Flink over portable execution well. I have
>> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the
>> progress.
>>
>> [1] https://s.apache.org/beam-fn-api
>>
>>
>> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> And one more question, did you launch your pipeline with streaming=True 
>>> pipeline
>>> options? I think you need to use --streaming=True to have unbounded
>>> source working properly.
>>>
>>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> Hi Sumeet,
>>>>
>>>> Which Beam version are you using for your pipeline?
>>>>
>>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>> I don't believe Fn API DirectRunner supports streaming yet (I might be
>>>>> wrong). I can confirm that this works for Dataflow.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thanks Cham! But I don't think this is Flink specific. I have
>>>>>> observed similar behaviour with DirectRunner as well BTW.
>>>>>>
>>>>>> ..Sumeet
>>>>>>
>>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I'm not too familiar with Flink but it seems like, for streaming
>>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>>>>>>> steps for some reason.
>>>>>>> * X-lang Bounded read with Flink seems to be fine.
>>>>>>> * X-lang Kafka sink and with Flink to be fine.
>>>>>>>
>>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for
>>>>>>> tracking.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Cham,
>>>>>>>>
>>>>>>>> Do you have pointers on what might be going on? Or something else I
>>>>>>>> can try? I had posted the same on StackOverflow [1], it seems that I'm 
>>>>>>>> not
>>>>>>>> the only one seeing this issue at the moment.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sumeet
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Took me some time to setup the Java test (using Java after more
>>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink 
>>>>>>>>> seems to
>>>>>>>>> work fine.
>>>>>>>>>
>>>>>>>>> Here's the relevant Java code. The only difference from the Python
>>>>>>>>> version is that I had to extract the KV from the KafkaRecord object 
>>>>>>>>> and
>>>>>>>>> construct a PCollection<KV> explicitly before writing to the output 
>>>>>>>>> topic.
>>>>>>>>>
>>>>>>>>> ~~~~~~~~
>>>>>>>>> package org.apache.beam.kafka.test;
>>>>>>>>>
>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>>>>>> import org.apache.beam.sdk.options.Default;
>>>>>>>>> import org.apache.beam.sdk.options.Description;
>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>> import org.apache.beam.sdk.transforms.*;
>>>>>>>>> import org.apache.beam.sdk.values.KV;
>>>>>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>>>>>
>>>>>>>>> public class KafkaTest {
>>>>>>>>>
>>>>>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>>>>>> Default bootstrap kafka servers
>>>>>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input
>>>>>>>>> kafka topic name
>>>>>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default
>>>>>>>>> output kafka topic name
>>>>>>>>>
>>>>>>>>>   /** Specific pipeline options. */
>>>>>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>>>>>     @Description("Kafka bootstrap servers")
>>>>>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>>>>>     String getBootstrap();
>>>>>>>>>
>>>>>>>>>     void setBootstrap(String value);
>>>>>>>>>
>>>>>>>>>     @Description("Kafka input topic name")
>>>>>>>>>     @Default.String(INPUT_TOPIC)
>>>>>>>>>     String getInputTopic();
>>>>>>>>>
>>>>>>>>>     void setInputTopic(String value);
>>>>>>>>>
>>>>>>>>>     @Description("Kafka output topic name")
>>>>>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>>>>>     String getOutputTopic();
>>>>>>>>>
>>>>>>>>>     void setOutputTopic(String value);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   public static final void main(String[] args) throws Exception {
>>>>>>>>>     final KafkaTestOptions options =
>>>>>>>>>
>>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>>>>>
>>>>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>     pipeline
>>>>>>>>>         .apply(
>>>>>>>>>             "ReadFromKafka",
>>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>                 .withTopic(options.getInputTopic())
>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>>         .apply(
>>>>>>>>>             "PrepareForWriting",
>>>>>>>>>             ParDo.of(
>>>>>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>>>>>> String>>() {
>>>>>>>>>                   @ProcessElement
>>>>>>>>>                   public void processElement(ProcessContext c)
>>>>>>>>> throws Exception {
>>>>>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>>>>>> c.element().getKV().getValue()));
>>>>>>>>>                   }
>>>>>>>>>                 }))
>>>>>>>>>         .apply(
>>>>>>>>>             "WriteToKafka",
>>>>>>>>>             KafkaIO.<String, String>write()
>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>                 .withTopic(options.getOutputTopic())
>>>>>>>>>
>>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>>>>>
>>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>>>>>
>>>>>>>>>     pipeline.run();
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>> ~~~~~~~~~
>>>>>>>>>
>>>>>>>>> I'm firing the Java version as follows:
>>>>>>>>>
>>>>>>>>> $ mvn exec:java
>>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>>>>>> -Pflink-runner
>>>>>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>>>>>
>>>>>>>>> And I can see in real time, that as I publish records to the
>>>>>>>>> in_topic, the out_topic is able to receive them on a continuous basis.
>>>>>>>>>
>>>>>>>>> I hope this helps narrow down the issue.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sumeet
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Are you able to run a similar Java streaming pipeline using
>>>>>>>>>> KafkaIO and Flink ? (without x-lang)
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Cham
>>>>>>>>>>
>>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Cham!
>>>>>>>>>>>
>>>>>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>>>>>> follows:
>>>>>>>>>>>
>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>     _ = (
>>>>>>>>>>>         pipeline
>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>>>>>             topics=[in_topic],
>>>>>>>>>>>             max_num_records=3)
>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>             producer_config={
>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>
>>>>>>>>>>> I was able to see all 3 records being read, and written
>>>>>>>>>>> successfully to the out_topic as well. So, it appears that there 
>>>>>>>>>>> might be
>>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there 
>>>>>>>>>>> any
>>>>>>>>>>> setting that I might be missing?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Sumeet
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Cham!
>>>>>>>>>>>>
>>>>>>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>>>>>>> below), but I still don't see any data being consumed or written 
>>>>>>>>>>>> back to
>>>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>>>>>>> details/context that might help narrow down the issue. Apologies 
>>>>>>>>>>>> for being
>>>>>>>>>>>> a bit verbose from hereon!
>>>>>>>>>>>>
>>>>>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>>>>>
>>>>>>>>>>>> ~~~~~~
>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>>>>>
>>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>>>>>
>>>>>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>>>>>
>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>>>>>             },
>>>>>>>>>>>>             topics=[in_topic])
>>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>>             producer_config={
>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>>>>>             },
>>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>>
>>>>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>>>>>   import argparse
>>>>>>>>>>>>
>>>>>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>       '--bootstrap_servers',
>>>>>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>       '--in_topic',
>>>>>>>>>>>>       dest='in_topic',
>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>       '--out_topic',
>>>>>>>>>>>>       dest='out_topic',
>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>>>>>
>>>>>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>>>>>> ~~~~~
>>>>>>>>>>>>
>>>>>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>>>>>
>>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>>>>>
>>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>>>>>
>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>>>>>> v1
>>>>>>>>>>>> v2
>>>>>>>>>>>> v3
>>>>>>>>>>>>
>>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 
>>>>>>>>>>>> without
>>>>>>>>>>>> processing the records. I don't see any data written to out_topic. 
>>>>>>>>>>>> I
>>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>>>>>
>>>>>>>>>>>> INFO:root:Starting data pipeline.
>>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic 
>>>>>>>>>>>> out_topic=out_topic
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>>>>>> in_topic-0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0:
>>>>>>>>>>>> reading from in_topic-0 starting at offset 0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>>>>>> partition(s): in_topic-0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to 
>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>> of partition in_topic-0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset 
>>>>>>>>>>>> for
>>>>>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>>>>>
>>>>>>>>>>>> Additionally, the logs also emit complete consumer and producer
>>>>>>>>>>>> configs. I'm dumping them here, in case that helps:
>>>>>>>>>>>>
>>>>>>>>>>>> Consumer Config:
>>>>>>>>>>>>
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig
>>>>>>>>>>>> values:'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>>>>>> = true'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset
>>>>>>>>>>>> = earliest'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>> = default'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tdefault.api.timeout.ms = 60000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit
>>>>>>>>>>>> = false'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>>>>>> = true'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>>>>>>> 52428800'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms
>>>>>>>>>>>> = 500'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes =
>>>>>>>>>>>> 1'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> theartbeat.interval.ms = 3000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>> = []'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>>>>>> = true'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>>>>>>> read_uncommitted'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer =
>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>>>>>> = 1048576'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tmax.poll.interval.ms = 300000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records =
>>>>>>>>>>>> 500'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters =
>>>>>>>>>>>> []'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>> = 2'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>> = INFO'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>> = 65536'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms
>>>>>>>>>>>> = 30000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>> = 100'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>> = 60000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>> = 300'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>> = 60'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>> = 131072'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms
>>>>>>>>>>>> = 10000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>> = https'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>> = JKS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>> = JKS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer
>>>>>>>>>>>> = class 
>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>>
>>>>>>>>>>>> Producer Config:
>>>>>>>>>>>>
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig
>>>>>>>>>>>> values:'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>>>>>> 33554432'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>> = default'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type =
>>>>>>>>>>>> none'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tdelivery.timeout.ms = 120000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence
>>>>>>>>>>>> = false'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>> = []'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer =
>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms =
>>>>>>>>>>>> 60000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>>>>>> = 5'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size =
>>>>>>>>>>>> 1048576'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters =
>>>>>>>>>>>> []'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>> = 2'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>> = INFO'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class
>>>>>>>>>>>> = class 
>>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>> = 32768'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms
>>>>>>>>>>>> = 30000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>> = 100'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>> = 60000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>> = 300'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>> = 60'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>> = 131072'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>> = https'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>> = JKS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>> null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>> = JKS'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>> ttransaction.timeout.ms = 60000'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id
>>>>>>>>>>>> = null'
>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer =
>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any
>>>>>>>>>>>> pointers on what might be the issue are appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? 
>>>>>>>>>>>>> (given that
>>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might  
>>>>>>>>>>>>> not see
>>>>>>>>>>>>> prints in the  original console I think).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST
>>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, 
>>>>>>>>>>>>>> which is 29.
>>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the 
>>>>>>>>>>>>>> pipeline is
>>>>>>>>>>>>>> only for testing purpose, I would recommend reading from 
>>>>>>>>>>>>>> earliest offset to
>>>>>>>>>>>>>> see whether you get records. You can do so by constructing your
>>>>>>>>>>>>>> ReadFromKafka like:
>>>>>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka
>>>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any
>>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker 
>>>>>>>>>>>>>>> container, and I'm
>>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) to 
>>>>>>>>>>>>>>> publish and
>>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that 
>>>>>>>>>>>>>>> front.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the 
>>>>>>>>>>>>>>> Beam logs as I
>>>>>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to 
>>>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>>>> of partition test-0"
>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting 
>>>>>>>>>>>>>>> offset for
>>>>>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>>>>>> 1:foo
>>>>>>>>>>>>>>> 1:bar
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value:
>>>>>>>>>>>>>>> %s\n'
>>>>>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But despite this, I don't see the actual message being
>>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing 
>>>>>>>>>>>>>>> here are
>>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on 
>>>>>>>>>>>>>>> the Beam
>>>>>>>>>>>>>>> pipeline side, but could be incorrect.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to