I opened https://github.com/apache/beam/pull/13779 for exxposing built-in timestamp policy and commitOffsetInFinalize to ReadFromKafka.
On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath <[email protected]> wrote: > > > On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <[email protected]> wrote: > >> Re Cham, >> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you >>> just need support for FixedWindows, right ? This should work now I believe >>> since Dataflow Python multi-language pipelines use portable job submission >>> by default. >> >> The problem part is not the FixedWindows but the Reshuffle. In Java SDK, >> Reshuffle will be expanded into >> Window.<KV<K, V>>into(new >> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) >> which is rejected by the python sdk. >> > > Ah ok. It should not be rejected by Python SDK anymore but Reshuffle > ('IdentityWindowFn') indeed will fail when executing the pipeline using > Dataflow Runner v2 currently. > > >> I can have a simple PR to expose built-in timestamp policies. Brian, >> would you like to identify how much work would need to output KafkaRecord >> to python SDK? >> >> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Thanks for bringing this up Sameer. >>> >>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote: >>> >>>> +Chamikara Jayalath <[email protected]> >>>> >>>> Hi Sameer, >>>> >>>> Thanks for reaching out! >>>> >>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform >>>> when we have CustomWindow support in python SDK, which should be coming >>>> soon. >>>> >>> >>> Looking at https://github.com/apache/beam/pull/12572 seems like you >>> just need support for FixedWindows, right ? This should work now I believe >>> since Dataflow Python multi-language pipelines use portable job submission >>> by default. >>> >>> >>>> >>>> In terms of *TimestampPolicyFactory*,if you are using the built-in >>>> types, like ProcessingTimePolicy >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, >>>> LogAppendTimePolicy >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> >>>> and withCreateTime >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, >>>> it's not hard to expose them to ReadFromKafka. >>>> >>> >>> Yeah, exposing existing policies should not be too hard but defining new >>> policies (or any other option that requires second order functions) >>> requires cross-language UDF support which is not available yet. >>> >>> >>>> >>>> Another interesting topic you have mentioned in >>>> https://github.com/apache/beam/pull/12572 is that you also want to >>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the >>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It >>>> might be a future work for x-lang Kafka. >>>> >>> >>> This is because current ReadFromKafka transform exposes >>> Read.withoutMetaData() transform [1]. >>> I think current Beam Schema support in Python should be adequate to >>> expand this and support a PCollection<Row> that represents a >>> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette >>> <[email protected]> to confirm. >>> >>> [1] >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595 >>> [2] >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java >>> >>> Thanks, >>> Cham >>> >>> >>>> >>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < >>>> [email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am using Beam's Cross language support for reading from Kafka but it >>>>> is missing some features available in the java sdk that I would like to >>>>> use. Specifically, I am interested in the Kafka Commit Transform >>>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This >>>>> will also require being able to specify if the metadata is needed or not >>>>> as >>>>> part of the KafkaIO transform creation. In addition, the >>>>> `commitOffsetsInFinalize` >>>>> and `TimestampPolicyFactory`parameters are also missing from the >>>>> python wrapper. >>>>> >>>>> My use case is as follows: >>>>> I have Kafka topics that have data corresponding to the Mongo change >>>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I >>>>> read these updated mongo documents, apply some transformations and stream >>>>> them to BigQuery. >>>>> >>>>> Currently, I can only use the `auto.commit` from Kafka consumer config >>>>> and I believe the message is acked/offset committed after the consumer in >>>>> KafkaIO finishes reading them. If there are any errors in later stages of >>>>> my pipeline or if the pipeline is restarted and it can't be drained >>>>> gracefully, I will lose the already acked messages. Hence, I want to >>>>> commit >>>>> the offsets only after they are successfully written to BigQuery. >>>>> >>>>> Here is a snippet of my pipeline code. >>>>> >>>>>> def run(self, pipeline: Pipeline): >>>>>> consumer_config = { >>>>>> 'bootstrap.servers': self.bootstrap_servers, >>>>>> 'auto.offset.reset': 'latest', >>>>>> # Ideally we want auto.commit disabled, but we need a way to >>>>>> acknowledge the messages manually >>>>>> # 'enable.auto.commit': 'false', # messages must be acked >>>>>> explicitly but prevents loss in case of failures >>>>>> 'auto.commit.interval.ms': '60000', # keep a high value since >>>>>> manual commits are not supported and messages will be lost if there is >>>>>> an error in the pipeline >>>>>> 'group.id': 'dev_streaming_dr_beam_pipeline' >>>>>> } >>>>>> >>>>>> streamed_dr_kvs_raw = ( >>>>>> pipeline >>>>>> | 'Read from Kafka Stream' >> >>>>>> ReadFromKafka( >>>>>> consumer_config=consumer_config, >>>>>> topics=['mongo_kafka_connect.requests'], >>>>>> max_num_records=1, >>>>>> ) >>>>>> ) >>>>>> >>>>>> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >>>>>> >> ParDo(MongoKafkaMessageDeserializer()) >>>>>> >>>>>> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = >>>>>> lambda elem: elem.mongo_document is not None >>>>>> filtered_dr_ds_with_record_ts = ( >>>>>> dr_data_stream >>>>>> | 'Filter empty values' >> Filter(filter_record_fn) >>>>>> | 'Extract Timestamp' >> >>>>>> ParDo(MongoKafkaMessageTimestampExtractor()) >>>>>> ) >>>>>> >>>>>> # The lateness window defines how long the state is kept for older >>>>>> windows >>>>>> # and saving state for a longer duration can create memory pressure. >>>>>> Note, the state is >>>>>> # saved in persistent disk but is optimistically fetched in the local >>>>>> memory. >>>>>> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >>>>>> >> WindowInto( >>>>>> FixedWindows(30), # 30 seconds >>>>>> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any >>>>>> time late data arrives after 1 min >>>>>> allowed_lateness=24 * 60 * 60, # 24 hours for late data >>>>>> accumulation_mode=AccumulationMode.DISCARDING >>>>>> ) >>>>>> >>>>>> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = >>>>>> lambda elems: elems[0].mongo_document >>>>>> de_duped_dr_records = ( >>>>>> batched_dr_records >>>>>> | 'Group by Message Key' >> GroupBy('message_key') >>>>>> | 'Select Latest' >> Latest.PerKey() >>>>>> | 'Values' >> Values() >>>>>> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) >>>>>> ) >>>>>> >>>>>> dr_with_features = de_duped_dr_records | 'Extract Features' >> >>>>>> ParDo(DealRequestEnricher()) >>>>>> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> >>>>>> ParDo(DealRequestFeaturesToBQTableRow()) >>>>>> >>>>>> table_schema = >>>>>> self.deal_request_schema_util.get_big_query_table_schema() >>>>>> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( >>>>>> dest_table_spec_vp=StaticValueProvider(str, >>>>>> self.dst_bq_table_spec), >>>>>> dest_table_schema_vp=StaticValueProvider(str, table_schema), >>>>>> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', >>>>>> 'field': 'created_at'}), >>>>>> dead_letter_table_spec_vp=StaticValueProvider(str, >>>>>> self.dead_letter_table_spec) >>>>>> ) >>>>>> >>>>>> >>>>> Thanks! >>>>> >>>>> -- >>>>> Regards, >>>>> Sameer Bhadouria. >>>>> >>>>
