Thanks for the quick response on this. Looking forward to these features in future.
Regarding `TimestampPolicyFactory` I wanted to use the predefined `CreateTime` policy instead of the default `processingTime`. Cheers, -Sameer. On 2021/01/15 21:18:25, Boyuan Zhang <boyu...@google.com> wrote: > +Chamikara Jayalath <chamik...@google.com> > > Hi Sameer, > > Thanks for reaching out! > > We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when > we have CustomWindow support in python SDK, which should be coming soon. > > In terms of *TimestampPolicyFactory*,if you are using the built-in types, > like ProcessingTimePolicy > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, > LogAppendTimePolicy > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> > and withCreateTime > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, > it's not hard to expose them to ReadFromKafka. > > Another interesting topic you have mentioned in > https://github.com/apache/beam/pull/12572 is that you also want to retrieve > KafkaRecord from ReadFromKafka instead of bytes. That requires the > KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It > might be a future work for x-lang Kafka. > > On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < > sameer.bhadou...@gmail.com> wrote: > > > Hello, > > > > I am using Beam's Cross language support for reading from Kafka but it is > > missing some features available in the java sdk that I would like to use. > > Specifically, I am interested in the Kafka Commit Transform > > <https://github.com/apache/beam/pull/12572> feature in java sdk. This > > will also require being able to specify if the metadata is needed or not as > > part of the KafkaIO transform creation. In addition, the > > `commitOffsetsInFinalize` > > and `TimestampPolicyFactory`parameters are also missing from the python > > wrapper. > > > > My use case is as follows: > > I have Kafka topics that have data corresponding to the Mongo change > > streams produced by the Mongo Source Kafka connector. In my pipeline, I > > read these updated mongo documents, apply some transformations and stream > > them to BigQuery. > > > > Currently, I can only use the `auto.commit` from Kafka consumer config and > > I believe the message is acked/offset committed after the consumer in > > KafkaIO finishes reading them. If there are any errors in later stages of > > my pipeline or if the pipeline is restarted and it can't be drained > > gracefully, I will lose the already acked messages. Hence, I want to commit > > the offsets only after they are successfully written to BigQuery. > > > > Here is a snippet of my pipeline code. > > > >> def run(self, pipeline: Pipeline): > >> consumer_config = { > >> 'bootstrap.servers': self.bootstrap_servers, > >> 'auto.offset.reset': 'latest', > >> # Ideally we want auto.commit disabled, but we need a way to > >> acknowledge the messages manually > >> # 'enable.auto.commit': 'false', # messages must be acked > >> explicitly but prevents loss in case of failures > >> 'auto.commit.interval.ms': '60000', # keep a high value since > >> manual commits are not supported and messages will be lost if there is an > >> error in the pipeline > >> 'group.id': 'dev_streaming_dr_beam_pipeline' > >> } > >> > >> streamed_dr_kvs_raw = ( > >> pipeline > >> | 'Read from Kafka Stream' >> > >> ReadFromKafka( > >> consumer_config=consumer_config, > >> topics=['mongo_kafka_connect.requests'], > >> max_num_records=1, > >> ) > >> ) > >> > >> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> > >> ParDo(MongoKafkaMessageDeserializer()) > >> > >> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = > >> lambda elem: elem.mongo_document is not None > >> filtered_dr_ds_with_record_ts = ( > >> dr_data_stream > >> | 'Filter empty values' >> Filter(filter_record_fn) > >> | 'Extract Timestamp' >> > >> ParDo(MongoKafkaMessageTimestampExtractor()) > >> ) > >> > >> # The lateness window defines how long the state is kept for older > >> windows > >> # and saving state for a longer duration can create memory pressure. > >> Note, the state is > >> # saved in persistent disk but is optimistically fetched in the local > >> memory. > >> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> > >> WindowInto( > >> FixedWindows(30), # 30 seconds > >> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any > >> time late data arrives after 1 min > >> allowed_lateness=24 * 60 * 60, # 24 hours for late data > >> accumulation_mode=AccumulationMode.DISCARDING > >> ) > >> > >> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = > >> lambda elems: elems[0].mongo_document > >> de_duped_dr_records = ( > >> batched_dr_records > >> | 'Group by Message Key' >> GroupBy('message_key') > >> | 'Select Latest' >> Latest.PerKey() > >> | 'Values' >> Values() > >> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) > >> ) > >> > >> dr_with_features = de_duped_dr_records | 'Extract Features' >> > >> ParDo(DealRequestEnricher()) > >> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> > >> ParDo(DealRequestFeaturesToBQTableRow()) > >> > >> table_schema = > >> self.deal_request_schema_util.get_big_query_table_schema() > >> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( > >> dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec), > >> dest_table_schema_vp=StaticValueProvider(str, table_schema), > >> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', > >> 'field': 'created_at'}), > >> dead_letter_table_spec_vp=StaticValueProvider(str, > >> self.dead_letter_table_spec) > >> ) > >> > >> > > Thanks! > > > > -- > > Regards, > > Sameer Bhadouria. > > >