Thanks for the quick response on this. Looking forward to these features in 
future.

Regarding `TimestampPolicyFactory` I wanted to use the predefined `CreateTime` 
policy instead of the default `processingTime`.

Cheers,
-Sameer.

On 2021/01/15 21:18:25, Boyuan Zhang <boyu...@google.com> wrote: 
> +Chamikara Jayalath <chamik...@google.com>
> 
> Hi Sameer,
> 
> Thanks for reaching out!
> 
> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when
> we have CustomWindow support in python SDK, which should be coming soon.
> 
> In terms of *TimestampPolicyFactory*,if you are using the built-in types,
> like ProcessingTimePolicy
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
> LogAppendTimePolicy
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
> and withCreateTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
> it's not hard to expose them to ReadFromKafka.
> 
> Another interesting topic you have mentioned in
> https://github.com/apache/beam/pull/12572 is that you also want to retrieve
> KafkaRecord from ReadFromKafka instead of bytes. That requires the
> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
> might be a future work for x-lang Kafka.
> 
> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
> sameer.bhadou...@gmail.com> wrote:
> 
> > Hello,
> >
> > I am using Beam's Cross language support for reading from Kafka but it is
> > missing some features available in the java sdk that I would like to use.
> > Specifically, I am interested in the Kafka Commit Transform
> > <https://github.com/apache/beam/pull/12572> feature in java sdk. This
> > will also require being able to specify if the metadata is needed or not as
> > part of the KafkaIO transform creation. In addition, the 
> > `commitOffsetsInFinalize`
> > and `TimestampPolicyFactory`parameters are also missing from the python
> > wrapper.
> >
> > My use case is as follows:
> > I have Kafka topics that have data corresponding to the Mongo change
> > streams produced by the Mongo Source Kafka connector. In my pipeline, I
> > read these updated mongo documents, apply some transformations and stream
> > them to BigQuery.
> >
> > Currently, I can only use the `auto.commit` from Kafka consumer config and
> > I believe the message is acked/offset committed after the consumer in
> > KafkaIO finishes reading them. If there are any errors in later stages of
> > my pipeline or if the pipeline is restarted and it can't be drained
> > gracefully, I will lose the already acked messages. Hence, I want to commit
> > the offsets only after they are successfully written to BigQuery.
> >
> > Here is a snippet of my pipeline code.
> >
> >> def run(self, pipeline: Pipeline):
> >>    consumer_config = {
> >>       'bootstrap.servers': self.bootstrap_servers,
> >>       'auto.offset.reset': 'latest',
> >>       # Ideally we want auto.commit disabled, but we need a way to 
> >> acknowledge the messages manually
> >>       # 'enable.auto.commit': 'false',  # messages must be acked 
> >> explicitly but prevents loss in case of failures
> >>       'auto.commit.interval.ms': '60000',  # keep a high value since 
> >> manual commits are not supported and messages will be lost if there is an 
> >> error in the pipeline
> >>       'group.id': 'dev_streaming_dr_beam_pipeline'
> >>    }
> >>
> >>    streamed_dr_kvs_raw = (
> >>          pipeline
> >>          | 'Read from Kafka Stream' >>
> >>          ReadFromKafka(
> >>             consumer_config=consumer_config,
> >>             topics=['mongo_kafka_connect.requests'],
> >>             max_num_records=1,
> >>          )
> >>    )
> >>
> >>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
> >> ParDo(MongoKafkaMessageDeserializer())
> >>
> >>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
> >> lambda elem: elem.mongo_document is not None
> >>    filtered_dr_ds_with_record_ts = (
> >>          dr_data_stream
> >>          | 'Filter empty values' >> Filter(filter_record_fn)
> >>          | 'Extract Timestamp' >> 
> >> ParDo(MongoKafkaMessageTimestampExtractor())
> >>    )
> >>
> >>    # The lateness window defines how long the state is kept for older 
> >> windows
> >>    # and saving state for a longer duration can create memory pressure. 
> >> Note, the state is
> >>    # saved in persistent disk but is optimistically fetched in the local 
> >> memory.
> >>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
> >> WindowInto(
> >>       FixedWindows(30),  # 30 seconds
> >>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
> >> time late data arrives after 1 min
> >>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
> >>       accumulation_mode=AccumulationMode.DISCARDING
> >>    )
> >>
> >>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = 
> >> lambda elems: elems[0].mongo_document
> >>    de_duped_dr_records = (
> >>          batched_dr_records
> >>          | 'Group by Message Key' >> GroupBy('message_key')
> >>          | 'Select Latest' >> Latest.PerKey()
> >>          | 'Values' >> Values()
> >>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
> >>    )
> >>
> >>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
> >> ParDo(DealRequestEnricher())
> >>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
> >> ParDo(DealRequestFeaturesToBQTableRow())
> >>
> >>    table_schema = 
> >> self.deal_request_schema_util.get_big_query_table_schema()
> >>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
> >>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
> >>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
> >>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
> >> 'field': 'created_at'}),
> >>       dead_letter_table_spec_vp=StaticValueProvider(str, 
> >> self.dead_letter_table_spec)
> >>    )
> >>
> >>
> > Thanks!
> >
> > --
> > Regards,
> > Sameer Bhadouria.
> >
> 

Reply via email to