+Chamikara Jayalath <chamik...@google.com>

Hi Sameer,

Thanks for reaching out!

We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when
we have CustomWindow support in python SDK, which should be coming soon.

In terms of *TimestampPolicyFactory*,if you are using the built-in types,
like ProcessingTimePolicy
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
LogAppendTimePolicy
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
and withCreateTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
it's not hard to expose them to ReadFromKafka.

Another interesting topic you have mentioned in
https://github.com/apache/beam/pull/12572 is that you also want to retrieve
KafkaRecord from ReadFromKafka instead of bytes. That requires the
KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
might be a future work for x-lang Kafka.

On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
sameer.bhadou...@gmail.com> wrote:

> Hello,
>
> I am using Beam's Cross language support for reading from Kafka but it is
> missing some features available in the java sdk that I would like to use.
> Specifically, I am interested in the Kafka Commit Transform
> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
> will also require being able to specify if the metadata is needed or not as
> part of the KafkaIO transform creation. In addition, the 
> `commitOffsetsInFinalize`
> and `TimestampPolicyFactory`parameters are also missing from the python
> wrapper.
>
> My use case is as follows:
> I have Kafka topics that have data corresponding to the Mongo change
> streams produced by the Mongo Source Kafka connector. In my pipeline, I
> read these updated mongo documents, apply some transformations and stream
> them to BigQuery.
>
> Currently, I can only use the `auto.commit` from Kafka consumer config and
> I believe the message is acked/offset committed after the consumer in
> KafkaIO finishes reading them. If there are any errors in later stages of
> my pipeline or if the pipeline is restarted and it can't be drained
> gracefully, I will lose the already acked messages. Hence, I want to commit
> the offsets only after they are successfully written to BigQuery.
>
> Here is a snippet of my pipeline code.
>
>> def run(self, pipeline: Pipeline):
>>    consumer_config = {
>>       'bootstrap.servers': self.bootstrap_servers,
>>       'auto.offset.reset': 'latest',
>>       # Ideally we want auto.commit disabled, but we need a way to 
>> acknowledge the messages manually
>>       # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
>> but prevents loss in case of failures
>>       'auto.commit.interval.ms': '60000',  # keep a high value since manual 
>> commits are not supported and messages will be lost if there is an error in 
>> the pipeline
>>       'group.id': 'dev_streaming_dr_beam_pipeline'
>>    }
>>
>>    streamed_dr_kvs_raw = (
>>          pipeline
>>          | 'Read from Kafka Stream' >>
>>          ReadFromKafka(
>>             consumer_config=consumer_config,
>>             topics=['mongo_kafka_connect.requests'],
>>             max_num_records=1,
>>          )
>>    )
>>
>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>> ParDo(MongoKafkaMessageDeserializer())
>>
>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>> lambda elem: elem.mongo_document is not None
>>    filtered_dr_ds_with_record_ts = (
>>          dr_data_stream
>>          | 'Filter empty values' >> Filter(filter_record_fn)
>>          | 'Extract Timestamp' >> 
>> ParDo(MongoKafkaMessageTimestampExtractor())
>>    )
>>
>>    # The lateness window defines how long the state is kept for older windows
>>    # and saving state for a longer duration can create memory pressure. 
>> Note, the state is
>>    # saved in persistent disk but is optimistically fetched in the local 
>> memory.
>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>> WindowInto(
>>       FixedWindows(30),  # 30 seconds
>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any time 
>> late data arrives after 1 min
>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>       accumulation_mode=AccumulationMode.DISCARDING
>>    )
>>
>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda 
>> elems: elems[0].mongo_document
>>    de_duped_dr_records = (
>>          batched_dr_records
>>          | 'Group by Message Key' >> GroupBy('message_key')
>>          | 'Select Latest' >> Latest.PerKey()
>>          | 'Values' >> Values()
>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>    )
>>
>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
>> ParDo(DealRequestEnricher())
>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
>> ParDo(DealRequestFeaturesToBQTableRow())
>>
>>    table_schema = self.deal_request_schema_util.get_big_query_table_schema()
>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
>> 'field': 'created_at'}),
>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
>> self.dead_letter_table_spec)
>>    )
>>
>>
> Thanks!
>
> --
> Regards,
> Sameer Bhadouria.
>

Reply via email to