Thanks! Appreciate the quick response!!

-Sameer.

On 2021/01/20 19:41:39, Boyuan Zhang <boyu...@google.com> wrote: 
> I opened https://github.com/apache/beam/pull/13779 for exxposing built-in
> timestamp policy and commitOffsetInFinalize to ReadFromKafka.
> 
> On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
> 
> >
> >
> > On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <boyu...@google.com> wrote:
> >
> >> Re Cham,
> >>
> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you
> >>> just need support for FixedWindows, right ? This should work now I believe
> >>> since Dataflow Python multi-language pipelines use portable job submission
> >>> by default.
> >>
> >> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
> >> Reshuffle will be expanded into
> >> Window.<KV<K, V>>into(new
> >> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
> >> which is rejected by the python sdk.
> >>
> >
> > Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
> > ('IdentityWindowFn') indeed will fail when executing the pipeline using
> > Dataflow Runner v2 currently.
> >
> >
> >> I can have a simple PR to expose built-in timestamp policies. Brian,
> >> would you like to identify how much work would need to output KafkaRecord
> >> to python SDK?
> >>
> >> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <chamik...@google.com>
> >> wrote:
> >>
> >>> Thanks for bringing this up Sameer.
> >>>
> >>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <boyu...@google.com> wrote:
> >>>
> >>>> +Chamikara Jayalath <chamik...@google.com>
> >>>>
> >>>> Hi Sameer,
> >>>>
> >>>> Thanks for reaching out!
> >>>>
> >>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
> >>>> when we have CustomWindow support in python SDK, which should be coming
> >>>> soon.
> >>>>
> >>>
> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you
> >>> just need support for FixedWindows, right ? This should work now I believe
> >>> since Dataflow Python multi-language pipelines use portable job submission
> >>> by default.
> >>>
> >>>
> >>>>
> >>>> In terms of *TimestampPolicyFactory*,if you are using the built-in
> >>>> types, like ProcessingTimePolicy
> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
> >>>> LogAppendTimePolicy
> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
> >>>> and withCreateTime
> >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
> >>>> it's not hard to expose them to ReadFromKafka.
> >>>>
> >>>
> >>> Yeah, exposing existing policies should not be too hard but defining new
> >>> policies (or any other option that requires second order functions)
> >>> requires cross-language UDF support which is not available yet.
> >>>
> >>>
> >>>>
> >>>> Another interesting topic you have mentioned in
> >>>> https://github.com/apache/beam/pull/12572 is that you also want to
> >>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires 
> >>>> the
> >>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
> >>>> might be a future work for x-lang Kafka.
> >>>>
> >>>
> >>> This is because current ReadFromKafka transform exposes
> >>> Read.withoutMetaData() transform [1].
> >>> I think current Beam Schema support in Python should be adequate to
> >>> expand this and support a PCollection<Row> that represents a
> >>> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette
> >>> <bhule...@google.com> to confirm.
> >>>
> >>> [1]
> >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
> >>> [2]
> >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
> >>>
> >>> Thanks,
> >>> Cham
> >>>
> >>>
> >>>>
> >>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
> >>>> sameer.bhadou...@gmail.com> wrote:
> >>>>
> >>>>> Hello,
> >>>>>
> >>>>> I am using Beam's Cross language support for reading from Kafka but it
> >>>>> is missing some features available in the java sdk that I would like to
> >>>>> use. Specifically, I am interested in the Kafka Commit Transform
> >>>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
> >>>>> will also require being able to specify if the metadata is needed or 
> >>>>> not as
> >>>>> part of the KafkaIO transform creation. In addition, the 
> >>>>> `commitOffsetsInFinalize`
> >>>>> and `TimestampPolicyFactory`parameters are also missing from the
> >>>>> python wrapper.
> >>>>>
> >>>>> My use case is as follows:
> >>>>> I have Kafka topics that have data corresponding to the Mongo change
> >>>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
> >>>>> read these updated mongo documents, apply some transformations and 
> >>>>> stream
> >>>>> them to BigQuery.
> >>>>>
> >>>>> Currently, I can only use the `auto.commit` from Kafka consumer config
> >>>>> and I believe the message is acked/offset committed after the consumer 
> >>>>> in
> >>>>> KafkaIO finishes reading them. If there are any errors in later stages 
> >>>>> of
> >>>>> my pipeline or if the pipeline is restarted and it can't be drained
> >>>>> gracefully, I will lose the already acked messages. Hence, I want to 
> >>>>> commit
> >>>>> the offsets only after they are successfully written to BigQuery.
> >>>>>
> >>>>> Here is a snippet of my pipeline code.
> >>>>>
> >>>>>> def run(self, pipeline: Pipeline):
> >>>>>>    consumer_config = {
> >>>>>>       'bootstrap.servers': self.bootstrap_servers,
> >>>>>>       'auto.offset.reset': 'latest',
> >>>>>>       # Ideally we want auto.commit disabled, but we need a way to 
> >>>>>> acknowledge the messages manually
> >>>>>>       # 'enable.auto.commit': 'false',  # messages must be acked 
> >>>>>> explicitly but prevents loss in case of failures
> >>>>>>       'auto.commit.interval.ms': '60000',  # keep a high value since 
> >>>>>> manual commits are not supported and messages will be lost if there is 
> >>>>>> an error in the pipeline
> >>>>>>       'group.id': 'dev_streaming_dr_beam_pipeline'
> >>>>>>    }
> >>>>>>
> >>>>>>    streamed_dr_kvs_raw = (
> >>>>>>          pipeline
> >>>>>>          | 'Read from Kafka Stream' >>
> >>>>>>          ReadFromKafka(
> >>>>>>             consumer_config=consumer_config,
> >>>>>>             topics=['mongo_kafka_connect.requests'],
> >>>>>>             max_num_records=1,
> >>>>>>          )
> >>>>>>    )
> >>>>>>
> >>>>>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' 
> >>>>>> >> ParDo(MongoKafkaMessageDeserializer())
> >>>>>>
> >>>>>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] 
> >>>>>> = lambda elem: elem.mongo_document is not None
> >>>>>>    filtered_dr_ds_with_record_ts = (
> >>>>>>          dr_data_stream
> >>>>>>          | 'Filter empty values' >> Filter(filter_record_fn)
> >>>>>>          | 'Extract Timestamp' >> 
> >>>>>> ParDo(MongoKafkaMessageTimestampExtractor())
> >>>>>>    )
> >>>>>>
> >>>>>>    # The lateness window defines how long the state is kept for older 
> >>>>>> windows
> >>>>>>    # and saving state for a longer duration can create memory 
> >>>>>> pressure. Note, the state is
> >>>>>>    # saved in persistent disk but is optimistically fetched in the 
> >>>>>> local memory.
> >>>>>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' 
> >>>>>> >> WindowInto(
> >>>>>>       FixedWindows(30),  # 30 seconds
> >>>>>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
> >>>>>> time late data arrives after 1 min
> >>>>>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
> >>>>>>       accumulation_mode=AccumulationMode.DISCARDING
> >>>>>>    )
> >>>>>>
> >>>>>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = 
> >>>>>> lambda elems: elems[0].mongo_document
> >>>>>>    de_duped_dr_records = (
> >>>>>>          batched_dr_records
> >>>>>>          | 'Group by Message Key' >> GroupBy('message_key')
> >>>>>>          | 'Select Latest' >> Latest.PerKey()
> >>>>>>          | 'Values' >> Values()
> >>>>>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
> >>>>>>    )
> >>>>>>
> >>>>>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
> >>>>>> ParDo(DealRequestEnricher())
> >>>>>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
> >>>>>> ParDo(DealRequestFeaturesToBQTableRow())
> >>>>>>
> >>>>>>    table_schema = 
> >>>>>> self.deal_request_schema_util.get_big_query_table_schema()
> >>>>>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
> >>>>>>       dest_table_spec_vp=StaticValueProvider(str, 
> >>>>>> self.dst_bq_table_spec),
> >>>>>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
> >>>>>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
> >>>>>> 'field': 'created_at'}),
> >>>>>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
> >>>>>> self.dead_letter_table_spec)
> >>>>>>    )
> >>>>>>
> >>>>>>
> >>>>> Thanks!
> >>>>>
> >>>>> --
> >>>>> Regards,
> >>>>> Sameer Bhadouria.
> >>>>>
> >>>>
> 

Reply via email to