Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-25 Thread Sameer Bhadouria
Thanks! Appreciate the quick response!!

-Sameer.

On 2021/01/20 19:41:39, Boyuan Zhang  wrote: 
> I opened https://github.com/apache/beam/pull/13779 for exxposing built-in
> timestamp policy and commitOffsetInFinalize to ReadFromKafka.
> 
> On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath 
> wrote:
> 
> >
> >
> > On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang  wrote:
> >
> >> Re Cham,
> >>
> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you
> >>> just need support for FixedWindows, right ? This should work now I believe
> >>> since Dataflow Python multi-language pipelines use portable job submission
> >>> by default.
> >>
> >> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
> >> Reshuffle will be expanded into
> >> Window.>into(new
> >> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
> >> which is rejected by the python sdk.
> >>
> >
> > Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
> > ('IdentityWindowFn') indeed will fail when executing the pipeline using
> > Dataflow Runner v2 currently.
> >
> >
> >> I can have a simple PR to expose built-in timestamp policies. Brian,
> >> would you like to identify how much work would need to output KafkaRecord
> >> to python SDK?
> >>
> >> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
> >> wrote:
> >>
> >>> Thanks for bringing this up Sameer.
> >>>
> >>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
> >>>
>  +Chamikara Jayalath 
> 
>  Hi Sameer,
> 
>  Thanks for reaching out!
> 
>  We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>  when we have CustomWindow support in python SDK, which should be coming
>  soon.
> 
> >>>
> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you
> >>> just need support for FixedWindows, right ? This should work now I believe
> >>> since Dataflow Python multi-language pipelines use portable job submission
> >>> by default.
> >>>
> >>>
> 
>  In terms of *TimestampPolicyFactory*,if you are using the built-in
>  types, like ProcessingTimePolicy
>  ,
>  LogAppendTimePolicy
>  
>  and withCreateTime
>  ,
>  it's not hard to expose them to ReadFromKafka.
> 
> >>>
> >>> Yeah, exposing existing policies should not be too hard but defining new
> >>> policies (or any other option that requires second order functions)
> >>> requires cross-language UDF support which is not available yet.
> >>>
> >>>
> 
>  Another interesting topic you have mentioned in
>  https://github.com/apache/beam/pull/12572 is that you also want to
>  retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires 
>  the
>  KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>  might be a future work for x-lang Kafka.
> 
> >>>
> >>> This is because current ReadFromKafka transform exposes
> >>> Read.withoutMetaData() transform [1].
> >>> I think current Beam Schema support in Python should be adequate to
> >>> expand this and support a PCollection that represents a
> >>> PCollectioin [2] in Python. +Brian Hulette
> >>>  to confirm.
> >>>
> >>> [1]
> >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
> >>> [2]
> >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
> >>>
> >>> Thanks,
> >>> Cham
> >>>
> >>>
> 
>  On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>  sameer.bhadou...@gmail.com> wrote:
> 
> > Hello,
> >
> > I am using Beam's Cross language support for reading from Kafka but it
> > is missing some features available in the java sdk that I would like to
> > use. Specifically, I am interested in the Kafka Commit Transform
> >  feature in java sdk. This
> > will also require being able to specify if the metadata is needed or 
> > not as
> > part of the KafkaIO transform creation. In addition, the 
> > `commitOffsetsInFinalize`
> > and `TimestampPolicyFactory`parameters are also missing from the
> > python wrapper.
> >
> > My use case is as follows:
> > I have Kafka topics that have data corresponding to the Mongo change
> > streams produced by the Mongo Source Kafka connector. In my pipeline, I
> > read these updated mongo documents, apply some transformations and 
> > stream
> 

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-20 Thread Boyuan Zhang
I opened https://github.com/apache/beam/pull/13779 for exxposing built-in
timestamp policy and commitOffsetInFinalize to ReadFromKafka.

On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath 
wrote:

>
>
> On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang  wrote:
>
>> Re Cham,
>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>
>> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
>> Reshuffle will be expanded into
>> Window.>into(new
>> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
>> which is rejected by the python sdk.
>>
>
> Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
> ('IdentityWindowFn') indeed will fail when executing the pipeline using
> Dataflow Runner v2 currently.
>
>
>> I can have a simple PR to expose built-in timestamp policies. Brian,
>> would you like to identify how much work would need to output KafkaRecord
>> to python SDK?
>>
>> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks for bringing this up Sameer.
>>>
>>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
>>>
 +Chamikara Jayalath 

 Hi Sameer,

 Thanks for reaching out!

 We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
 when we have CustomWindow support in python SDK, which should be coming
 soon.

>>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>>
>>>

 In terms of *TimestampPolicyFactory*,if you are using the built-in
 types, like ProcessingTimePolicy
 ,
 LogAppendTimePolicy
 
 and withCreateTime
 ,
 it's not hard to expose them to ReadFromKafka.

>>>
>>> Yeah, exposing existing policies should not be too hard but defining new
>>> policies (or any other option that requires second order functions)
>>> requires cross-language UDF support which is not available yet.
>>>
>>>

 Another interesting topic you have mentioned in
 https://github.com/apache/beam/pull/12572 is that you also want to
 retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
 KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
 might be a future work for x-lang Kafka.

>>>
>>> This is because current ReadFromKafka transform exposes
>>> Read.withoutMetaData() transform [1].
>>> I think current Beam Schema support in Python should be adequate to
>>> expand this and support a PCollection that represents a
>>> PCollectioin [2] in Python. +Brian Hulette
>>>  to confirm.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
>>> [2]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>>>
>>> Thanks,
>>> Cham
>>>
>>>

 On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
 sameer.bhadou...@gmail.com> wrote:

> Hello,
>
> I am using Beam's Cross language support for reading from Kafka but it
> is missing some features available in the java sdk that I would like to
> use. Specifically, I am interested in the Kafka Commit Transform
>  feature in java sdk. This
> will also require being able to specify if the metadata is needed or not 
> as
> part of the KafkaIO transform creation. In addition, the 
> `commitOffsetsInFinalize`
> and `TimestampPolicyFactory`parameters are also missing from the
> python wrapper.
>
> My use case is as follows:
> I have Kafka topics that have data corresponding to the Mongo change
> streams produced by the Mongo Source Kafka connector. In my pipeline, I
> read these updated mongo documents, apply some transformations and stream
> them to BigQuery.
>
> Currently, I can only use the `auto.commit` from Kafka consumer config
> and I believe the message is acked/offset committed after the consumer in
> KafkaIO finishes reading them. If there are any errors in later stages of
> my pipeline or if the pipeline is restarted and it can't be drained
> 

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Chamikara Jayalath
On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang  wrote:

> Re Cham,
>
>> Looking at https://github.com/apache/beam/pull/12572 seems like you just
>> need support for FixedWindows, right ? This should work now I believe since
>> Dataflow Python multi-language pipelines use portable job submission by
>> default.
>
> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
> Reshuffle will be expanded into
> Window.>into(new
> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
> which is rejected by the python sdk.
>

Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
('IdentityWindowFn') indeed will fail when executing the pipeline using
Dataflow Runner v2 currently.


> I can have a simple PR to expose built-in timestamp policies. Brian, would
> you like to identify how much work would need to output KafkaRecord to
> python SDK?
>
> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
> wrote:
>
>> Thanks for bringing this up Sameer.
>>
>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
>>
>>> +Chamikara Jayalath 
>>>
>>> Hi Sameer,
>>>
>>> Thanks for reaching out!
>>>
>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>>> when we have CustomWindow support in python SDK, which should be coming
>>> soon.
>>>
>>
>> Looking at https://github.com/apache/beam/pull/12572 seems like you just
>> need support for FixedWindows, right ? This should work now I believe since
>> Dataflow Python multi-language pipelines use portable job submission by
>> default.
>>
>>
>>>
>>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>>> types, like ProcessingTimePolicy
>>> ,
>>> LogAppendTimePolicy
>>> 
>>> and withCreateTime
>>> ,
>>> it's not hard to expose them to ReadFromKafka.
>>>
>>
>> Yeah, exposing existing policies should not be too hard but defining new
>> policies (or any other option that requires second order functions)
>> requires cross-language UDF support which is not available yet.
>>
>>
>>>
>>> Another interesting topic you have mentioned in
>>> https://github.com/apache/beam/pull/12572 is that you also want to
>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>>> might be a future work for x-lang Kafka.
>>>
>>
>> This is because current ReadFromKafka transform exposes
>> Read.withoutMetaData() transform [1].
>> I think current Beam Schema support in Python should be adequate to
>> expand this and support a PCollection that represents a
>> PCollectioin [2] in Python. +Brian Hulette
>>  to confirm.
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
>> [2]
>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>>> sameer.bhadou...@gmail.com> wrote:
>>>
 Hello,

 I am using Beam's Cross language support for reading from Kafka but it
 is missing some features available in the java sdk that I would like to
 use. Specifically, I am interested in the Kafka Commit Transform
  feature in java sdk. This
 will also require being able to specify if the metadata is needed or not as
 part of the KafkaIO transform creation. In addition, the 
 `commitOffsetsInFinalize`
 and `TimestampPolicyFactory`parameters are also missing from the
 python wrapper.

 My use case is as follows:
 I have Kafka topics that have data corresponding to the Mongo change
 streams produced by the Mongo Source Kafka connector. In my pipeline, I
 read these updated mongo documents, apply some transformations and stream
 them to BigQuery.

 Currently, I can only use the `auto.commit` from Kafka consumer config
 and I believe the message is acked/offset committed after the consumer in
 KafkaIO finishes reading them. If there are any errors in later stages of
 my pipeline or if the pipeline is restarted and it can't be drained
 gracefully, I will lose the already acked messages. Hence, I want to commit
 the offsets only after they are successfully written to BigQuery.

 Here is a snippet of my pipeline code.

> def run(self, pipeline: Pipeline):
>consumer_config = {
>   'bootstrap.servers': self.bootstrap_servers,
>  

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Boyuan Zhang
Re Cham,

> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.

The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
Reshuffle will be expanded into
Window.>into(new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
which is rejected by the python sdk.

I can have a simple PR to expose built-in timestamp policies. Brian, would
you like to identify how much work would need to output KafkaRecord to
python SDK?

On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
wrote:

> Thanks for bringing this up Sameer.
>
> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
>
>> +Chamikara Jayalath 
>>
>> Hi Sameer,
>>
>> Thanks for reaching out!
>>
>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>> when we have CustomWindow support in python SDK, which should be coming
>> soon.
>>
>
> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.
>
>
>>
>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>> types, like ProcessingTimePolicy
>> ,
>> LogAppendTimePolicy
>> 
>> and withCreateTime
>> ,
>> it's not hard to expose them to ReadFromKafka.
>>
>
> Yeah, exposing existing policies should not be too hard but defining new
> policies (or any other option that requires second order functions)
> requires cross-language UDF support which is not available yet.
>
>
>>
>> Another interesting topic you have mentioned in
>> https://github.com/apache/beam/pull/12572 is that you also want to
>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>> might be a future work for x-lang Kafka.
>>
>
> This is because current ReadFromKafka transform exposes
> Read.withoutMetaData() transform [1].
> I think current Beam Schema support in Python should be adequate to expand
> this and support a PCollection that represents a
> PCollectioin [2] in Python. +Brian Hulette
>  to confirm.
>
> [1]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
> [2]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>
> Thanks,
> Cham
>
>
>>
>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>> sameer.bhadou...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Beam's Cross language support for reading from Kafka but it
>>> is missing some features available in the java sdk that I would like to
>>> use. Specifically, I am interested in the Kafka Commit Transform
>>>  feature in java sdk. This
>>> will also require being able to specify if the metadata is needed or not as
>>> part of the KafkaIO transform creation. In addition, the 
>>> `commitOffsetsInFinalize`
>>> and `TimestampPolicyFactory`parameters are also missing from the python
>>> wrapper.
>>>
>>> My use case is as follows:
>>> I have Kafka topics that have data corresponding to the Mongo change
>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>>> read these updated mongo documents, apply some transformations and stream
>>> them to BigQuery.
>>>
>>> Currently, I can only use the `auto.commit` from Kafka consumer config
>>> and I believe the message is acked/offset committed after the consumer in
>>> KafkaIO finishes reading them. If there are any errors in later stages of
>>> my pipeline or if the pipeline is restarted and it can't be drained
>>> gracefully, I will lose the already acked messages. Hence, I want to commit
>>> the offsets only after they are successfully written to BigQuery.
>>>
>>> Here is a snippet of my pipeline code.
>>>
 def run(self, pipeline: Pipeline):
consumer_config = {
   'bootstrap.servers': self.bootstrap_servers,
   'auto.offset.reset': 'latest',
   # Ideally we want auto.commit disabled, but we need a way to 
 acknowledge the messages manually
   # 'enable.auto.commit': 'false',  # messages must be acked 
 explicitly but prevents loss in case of failures
   'auto.commit.interval.ms': '6',  # keep a high value since 
 

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Chamikara Jayalath
Thanks for bringing this up Sameer.

On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:

> +Chamikara Jayalath 
>
> Hi Sameer,
>
> Thanks for reaching out!
>
> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
> when we have CustomWindow support in python SDK, which should be coming
> soon.
>

Looking at https://github.com/apache/beam/pull/12572 seems like you just
need support for FixedWindows, right ? This should work now I believe since
Dataflow Python multi-language pipelines use portable job submission by
default.


>
> In terms of *TimestampPolicyFactory*,if you are using the built-in types,
> like ProcessingTimePolicy
> ,
> LogAppendTimePolicy
> 
> and withCreateTime
> ,
> it's not hard to expose them to ReadFromKafka.
>

Yeah, exposing existing policies should not be too hard but defining new
policies (or any other option that requires second order functions)
requires cross-language UDF support which is not available yet.


>
> Another interesting topic you have mentioned in
> https://github.com/apache/beam/pull/12572 is that you also want to
> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
> might be a future work for x-lang Kafka.
>

This is because current ReadFromKafka transform exposes
Read.withoutMetaData() transform [1].
I think current Beam Schema support in Python should be adequate to expand
this and support a PCollection that represents a
PCollectioin [2] in Python. +Brian Hulette
 to confirm.

[1]
https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
[2]
https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java

Thanks,
Cham


>
> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
> sameer.bhadou...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Beam's Cross language support for reading from Kafka but it is
>> missing some features available in the java sdk that I would like to use.
>> Specifically, I am interested in the Kafka Commit Transform
>>  feature in java sdk. This
>> will also require being able to specify if the metadata is needed or not as
>> part of the KafkaIO transform creation. In addition, the 
>> `commitOffsetsInFinalize`
>> and `TimestampPolicyFactory`parameters are also missing from the python
>> wrapper.
>>
>> My use case is as follows:
>> I have Kafka topics that have data corresponding to the Mongo change
>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>> read these updated mongo documents, apply some transformations and stream
>> them to BigQuery.
>>
>> Currently, I can only use the `auto.commit` from Kafka consumer config
>> and I believe the message is acked/offset committed after the consumer in
>> KafkaIO finishes reading them. If there are any errors in later stages of
>> my pipeline or if the pipeline is restarted and it can't be drained
>> gracefully, I will lose the already acked messages. Hence, I want to commit
>> the offsets only after they are successfully written to BigQuery.
>>
>> Here is a snippet of my pipeline code.
>>
>>> def run(self, pipeline: Pipeline):
>>>consumer_config = {
>>>   'bootstrap.servers': self.bootstrap_servers,
>>>   'auto.offset.reset': 'latest',
>>>   # Ideally we want auto.commit disabled, but we need a way to 
>>> acknowledge the messages manually
>>>   # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
>>> but prevents loss in case of failures
>>>   'auto.commit.interval.ms': '6',  # keep a high value since manual 
>>> commits are not supported and messages will be lost if there is an error in 
>>> the pipeline
>>>   'group.id': 'dev_streaming_dr_beam_pipeline'
>>>}
>>>
>>>streamed_dr_kvs_raw = (
>>>  pipeline
>>>  | 'Read from Kafka Stream' >>
>>>  ReadFromKafka(
>>> consumer_config=consumer_config,
>>> topics=['mongo_kafka_connect.requests'],
>>> max_num_records=1,
>>>  )
>>>)
>>>
>>>dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>>> ParDo(MongoKafkaMessageDeserializer())
>>>
>>>filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>>> lambda elem: elem.mongo_document is not None
>>>filtered_dr_ds_with_record_ts = (
>>>  dr_data_stream
>>>  | 'Filter empty values' 

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Sameer Bhadouria
Thanks for the quick response on this. Looking forward to these features in 
future.

Regarding `TimestampPolicyFactory` I wanted to use the predefined `CreateTime` 
policy instead of the default `processingTime`.

Cheers,
-Sameer.

On 2021/01/15 21:18:25, Boyuan Zhang  wrote: 
> +Chamikara Jayalath 
> 
> Hi Sameer,
> 
> Thanks for reaching out!
> 
> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when
> we have CustomWindow support in python SDK, which should be coming soon.
> 
> In terms of *TimestampPolicyFactory*,if you are using the built-in types,
> like ProcessingTimePolicy
> ,
> LogAppendTimePolicy
> 
> and withCreateTime
> ,
> it's not hard to expose them to ReadFromKafka.
> 
> Another interesting topic you have mentioned in
> https://github.com/apache/beam/pull/12572 is that you also want to retrieve
> KafkaRecord from ReadFromKafka instead of bytes. That requires the
> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
> might be a future work for x-lang Kafka.
> 
> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
> sameer.bhadou...@gmail.com> wrote:
> 
> > Hello,
> >
> > I am using Beam's Cross language support for reading from Kafka but it is
> > missing some features available in the java sdk that I would like to use.
> > Specifically, I am interested in the Kafka Commit Transform
> >  feature in java sdk. This
> > will also require being able to specify if the metadata is needed or not as
> > part of the KafkaIO transform creation. In addition, the 
> > `commitOffsetsInFinalize`
> > and `TimestampPolicyFactory`parameters are also missing from the python
> > wrapper.
> >
> > My use case is as follows:
> > I have Kafka topics that have data corresponding to the Mongo change
> > streams produced by the Mongo Source Kafka connector. In my pipeline, I
> > read these updated mongo documents, apply some transformations and stream
> > them to BigQuery.
> >
> > Currently, I can only use the `auto.commit` from Kafka consumer config and
> > I believe the message is acked/offset committed after the consumer in
> > KafkaIO finishes reading them. If there are any errors in later stages of
> > my pipeline or if the pipeline is restarted and it can't be drained
> > gracefully, I will lose the already acked messages. Hence, I want to commit
> > the offsets only after they are successfully written to BigQuery.
> >
> > Here is a snippet of my pipeline code.
> >
> >> def run(self, pipeline: Pipeline):
> >>consumer_config = {
> >>   'bootstrap.servers': self.bootstrap_servers,
> >>   'auto.offset.reset': 'latest',
> >>   # Ideally we want auto.commit disabled, but we need a way to 
> >> acknowledge the messages manually
> >>   # 'enable.auto.commit': 'false',  # messages must be acked 
> >> explicitly but prevents loss in case of failures
> >>   'auto.commit.interval.ms': '6',  # keep a high value since 
> >> manual commits are not supported and messages will be lost if there is an 
> >> error in the pipeline
> >>   'group.id': 'dev_streaming_dr_beam_pipeline'
> >>}
> >>
> >>streamed_dr_kvs_raw = (
> >>  pipeline
> >>  | 'Read from Kafka Stream' >>
> >>  ReadFromKafka(
> >> consumer_config=consumer_config,
> >> topics=['mongo_kafka_connect.requests'],
> >> max_num_records=1,
> >>  )
> >>)
> >>
> >>dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
> >> ParDo(MongoKafkaMessageDeserializer())
> >>
> >>filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
> >> lambda elem: elem.mongo_document is not None
> >>filtered_dr_ds_with_record_ts = (
> >>  dr_data_stream
> >>  | 'Filter empty values' >> Filter(filter_record_fn)
> >>  | 'Extract Timestamp' >> 
> >> ParDo(MongoKafkaMessageTimestampExtractor())
> >>)
> >>
> >># The lateness window defines how long the state is kept for older 
> >> windows
> >># and saving state for a longer duration can create memory pressure. 
> >> Note, the state is
> >># saved in persistent disk but is optimistically fetched in the local 
> >> memory.
> >>batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
> >> WindowInto(
> >>   FixedWindows(30),  # 30 seconds
> >>   trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
> >> time late data arrives after 1 min
> >>   allowed_lateness=24 * 60 * 60, # 24 hours for late data
> >>   

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Boyuan Zhang
+Chamikara Jayalath 

Hi Sameer,

Thanks for reaching out!

We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when
we have CustomWindow support in python SDK, which should be coming soon.

In terms of *TimestampPolicyFactory*,if you are using the built-in types,
like ProcessingTimePolicy
,
LogAppendTimePolicy

and withCreateTime
,
it's not hard to expose them to ReadFromKafka.

Another interesting topic you have mentioned in
https://github.com/apache/beam/pull/12572 is that you also want to retrieve
KafkaRecord from ReadFromKafka instead of bytes. That requires the
KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
might be a future work for x-lang Kafka.

On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
sameer.bhadou...@gmail.com> wrote:

> Hello,
>
> I am using Beam's Cross language support for reading from Kafka but it is
> missing some features available in the java sdk that I would like to use.
> Specifically, I am interested in the Kafka Commit Transform
>  feature in java sdk. This
> will also require being able to specify if the metadata is needed or not as
> part of the KafkaIO transform creation. In addition, the 
> `commitOffsetsInFinalize`
> and `TimestampPolicyFactory`parameters are also missing from the python
> wrapper.
>
> My use case is as follows:
> I have Kafka topics that have data corresponding to the Mongo change
> streams produced by the Mongo Source Kafka connector. In my pipeline, I
> read these updated mongo documents, apply some transformations and stream
> them to BigQuery.
>
> Currently, I can only use the `auto.commit` from Kafka consumer config and
> I believe the message is acked/offset committed after the consumer in
> KafkaIO finishes reading them. If there are any errors in later stages of
> my pipeline or if the pipeline is restarted and it can't be drained
> gracefully, I will lose the already acked messages. Hence, I want to commit
> the offsets only after they are successfully written to BigQuery.
>
> Here is a snippet of my pipeline code.
>
>> def run(self, pipeline: Pipeline):
>>consumer_config = {
>>   'bootstrap.servers': self.bootstrap_servers,
>>   'auto.offset.reset': 'latest',
>>   # Ideally we want auto.commit disabled, but we need a way to 
>> acknowledge the messages manually
>>   # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
>> but prevents loss in case of failures
>>   'auto.commit.interval.ms': '6',  # keep a high value since manual 
>> commits are not supported and messages will be lost if there is an error in 
>> the pipeline
>>   'group.id': 'dev_streaming_dr_beam_pipeline'
>>}
>>
>>streamed_dr_kvs_raw = (
>>  pipeline
>>  | 'Read from Kafka Stream' >>
>>  ReadFromKafka(
>> consumer_config=consumer_config,
>> topics=['mongo_kafka_connect.requests'],
>> max_num_records=1,
>>  )
>>)
>>
>>dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>> ParDo(MongoKafkaMessageDeserializer())
>>
>>filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>> lambda elem: elem.mongo_document is not None
>>filtered_dr_ds_with_record_ts = (
>>  dr_data_stream
>>  | 'Filter empty values' >> Filter(filter_record_fn)
>>  | 'Extract Timestamp' >> 
>> ParDo(MongoKafkaMessageTimestampExtractor())
>>)
>>
>># The lateness window defines how long the state is kept for older windows
>># and saving state for a longer duration can create memory pressure. 
>> Note, the state is
>># saved in persistent disk but is optimistically fetched in the local 
>> memory.
>>batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>> WindowInto(
>>   FixedWindows(30),  # 30 seconds
>>   trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any time 
>> late data arrives after 1 min
>>   allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>   accumulation_mode=AccumulationMode.DISCARDING
>>)
>>
>>extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda 
>> elems: elems[0].mongo_document
>>de_duped_dr_records = (
>>  batched_dr_records
>>  | 'Group by Message Key' >> GroupBy('message_key')
>>  | 'Select Latest' >> Latest.PerKey()
>>  | 'Values' >> Values()
>>  | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>)
>>
>>dr_with_features =