Hello,

I am using Beam's Cross language support for reading from Kafka but it is
missing some features available in the java sdk that I would like to use.
Specifically, I am interested in the Kafka Commit Transform
<https://github.com/apache/beam/pull/12572> feature in java sdk. This will
also require being able to specify if the metadata is needed or not as part
of the KafkaIO transform creation. In addition, the `commitOffsetsInFinalize`
and `TimestampPolicyFactory`parameters are also missing from the python
wrapper.

My use case is as follows:
I have Kafka topics that have data corresponding to the Mongo change
streams produced by the Mongo Source Kafka connector. In my pipeline, I
read these updated mongo documents, apply some transformations and stream
them to BigQuery.

Currently, I can only use the `auto.commit` from Kafka consumer config and
I believe the message is acked/offset committed after the consumer in
KafkaIO finishes reading them. If there are any errors in later stages of
my pipeline or if the pipeline is restarted and it can't be drained
gracefully, I will lose the already acked messages. Hence, I want to commit
the offsets only after they are successfully written to BigQuery.

Here is a snippet of my pipeline code.

> def run(self, pipeline: Pipeline):
>    consumer_config = {
>       'bootstrap.servers': self.bootstrap_servers,
>       'auto.offset.reset': 'latest',
>       # Ideally we want auto.commit disabled, but we need a way to 
> acknowledge the messages manually
>       # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
> but prevents loss in case of failures
>       'auto.commit.interval.ms': '60000',  # keep a high value since manual 
> commits are not supported and messages will be lost if there is an error in 
> the pipeline
>       'group.id': 'dev_streaming_dr_beam_pipeline'
>    }
>
>    streamed_dr_kvs_raw = (
>          pipeline
>          | 'Read from Kafka Stream' >>
>          ReadFromKafka(
>             consumer_config=consumer_config,
>             topics=['mongo_kafka_connect.requests'],
>             max_num_records=1,
>          )
>    )
>
>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
> ParDo(MongoKafkaMessageDeserializer())
>
>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
> lambda elem: elem.mongo_document is not None
>    filtered_dr_ds_with_record_ts = (
>          dr_data_stream
>          | 'Filter empty values' >> Filter(filter_record_fn)
>          | 'Extract Timestamp' >> ParDo(MongoKafkaMessageTimestampExtractor())
>    )
>
>    # The lateness window defines how long the state is kept for older windows
>    # and saving state for a longer duration can create memory pressure. Note, 
> the state is
>    # saved in persistent disk but is optimistically fetched in the local 
> memory.
>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
> WindowInto(
>       FixedWindows(30),  # 30 seconds
>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any time 
> late data arrives after 1 min
>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>       accumulation_mode=AccumulationMode.DISCARDING
>    )
>
>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda 
> elems: elems[0].mongo_document
>    de_duped_dr_records = (
>          batched_dr_records
>          | 'Group by Message Key' >> GroupBy('message_key')
>          | 'Select Latest' >> Latest.PerKey()
>          | 'Values' >> Values()
>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>    )
>
>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
> ParDo(DealRequestEnricher())
>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
> ParDo(DealRequestFeaturesToBQTableRow())
>
>    table_schema = self.deal_request_schema_util.get_big_query_table_schema()
>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 'field': 
> 'created_at'}),
>       dead_letter_table_spec_vp=StaticValueProvider(str, 
> self.dead_letter_table_spec)
>    )
>
>
Thanks!

--
Regards,
Sameer Bhadouria.

Reply via email to