Re Cham, > Looking at https://github.com/apache/beam/pull/12572 seems like you just > need support for FixedWindows, right ? This should work now I believe since > Dataflow Python multi-language pipelines use portable job submission by > default.
The problem part is not the FixedWindows but the Reshuffle. In Java SDK, Reshuffle will be expanded into Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) which is rejected by the python sdk. I can have a simple PR to expose built-in timestamp policies. Brian, would you like to identify how much work would need to output KafkaRecord to python SDK? On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]> wrote: > Thanks for bringing this up Sameer. > > On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote: > >> +Chamikara Jayalath <[email protected]> >> >> Hi Sameer, >> >> Thanks for reaching out! >> >> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform >> when we have CustomWindow support in python SDK, which should be coming >> soon. >> > > Looking at https://github.com/apache/beam/pull/12572 seems like you just > need support for FixedWindows, right ? This should work now I believe since > Dataflow Python multi-language pipelines use portable job submission by > default. > > >> >> In terms of *TimestampPolicyFactory*,if you are using the built-in >> types, like ProcessingTimePolicy >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, >> LogAppendTimePolicy >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> >> and withCreateTime >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, >> it's not hard to expose them to ReadFromKafka. >> > > Yeah, exposing existing policies should not be too hard but defining new > policies (or any other option that requires second order functions) > requires cross-language UDF support which is not available yet. > > >> >> Another interesting topic you have mentioned in >> https://github.com/apache/beam/pull/12572 is that you also want to >> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the >> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It >> might be a future work for x-lang Kafka. >> > > This is because current ReadFromKafka transform exposes > Read.withoutMetaData() transform [1]. > I think current Beam Schema support in Python should be adequate to expand > this and support a PCollection<Row> that represents a > PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette > <[email protected]> to confirm. > > [1] > https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595 > [2] > https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java > > Thanks, > Cham > > >> >> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < >> [email protected]> wrote: >> >>> Hello, >>> >>> I am using Beam's Cross language support for reading from Kafka but it >>> is missing some features available in the java sdk that I would like to >>> use. Specifically, I am interested in the Kafka Commit Transform >>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This >>> will also require being able to specify if the metadata is needed or not as >>> part of the KafkaIO transform creation. In addition, the >>> `commitOffsetsInFinalize` >>> and `TimestampPolicyFactory`parameters are also missing from the python >>> wrapper. >>> >>> My use case is as follows: >>> I have Kafka topics that have data corresponding to the Mongo change >>> streams produced by the Mongo Source Kafka connector. In my pipeline, I >>> read these updated mongo documents, apply some transformations and stream >>> them to BigQuery. >>> >>> Currently, I can only use the `auto.commit` from Kafka consumer config >>> and I believe the message is acked/offset committed after the consumer in >>> KafkaIO finishes reading them. If there are any errors in later stages of >>> my pipeline or if the pipeline is restarted and it can't be drained >>> gracefully, I will lose the already acked messages. Hence, I want to commit >>> the offsets only after they are successfully written to BigQuery. >>> >>> Here is a snippet of my pipeline code. >>> >>>> def run(self, pipeline: Pipeline): >>>> consumer_config = { >>>> 'bootstrap.servers': self.bootstrap_servers, >>>> 'auto.offset.reset': 'latest', >>>> # Ideally we want auto.commit disabled, but we need a way to >>>> acknowledge the messages manually >>>> # 'enable.auto.commit': 'false', # messages must be acked >>>> explicitly but prevents loss in case of failures >>>> 'auto.commit.interval.ms': '60000', # keep a high value since >>>> manual commits are not supported and messages will be lost if there is an >>>> error in the pipeline >>>> 'group.id': 'dev_streaming_dr_beam_pipeline' >>>> } >>>> >>>> streamed_dr_kvs_raw = ( >>>> pipeline >>>> | 'Read from Kafka Stream' >> >>>> ReadFromKafka( >>>> consumer_config=consumer_config, >>>> topics=['mongo_kafka_connect.requests'], >>>> max_num_records=1, >>>> ) >>>> ) >>>> >>>> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> >>>> ParDo(MongoKafkaMessageDeserializer()) >>>> >>>> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = >>>> lambda elem: elem.mongo_document is not None >>>> filtered_dr_ds_with_record_ts = ( >>>> dr_data_stream >>>> | 'Filter empty values' >> Filter(filter_record_fn) >>>> | 'Extract Timestamp' >> >>>> ParDo(MongoKafkaMessageTimestampExtractor()) >>>> ) >>>> >>>> # The lateness window defines how long the state is kept for older >>>> windows >>>> # and saving state for a longer duration can create memory pressure. >>>> Note, the state is >>>> # saved in persistent disk but is optimistically fetched in the local >>>> memory. >>>> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> >>>> WindowInto( >>>> FixedWindows(30), # 30 seconds >>>> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any >>>> time late data arrives after 1 min >>>> allowed_lateness=24 * 60 * 60, # 24 hours for late data >>>> accumulation_mode=AccumulationMode.DISCARDING >>>> ) >>>> >>>> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = >>>> lambda elems: elems[0].mongo_document >>>> de_duped_dr_records = ( >>>> batched_dr_records >>>> | 'Group by Message Key' >> GroupBy('message_key') >>>> | 'Select Latest' >> Latest.PerKey() >>>> | 'Values' >> Values() >>>> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) >>>> ) >>>> >>>> dr_with_features = de_duped_dr_records | 'Extract Features' >> >>>> ParDo(DealRequestEnricher()) >>>> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> >>>> ParDo(DealRequestFeaturesToBQTableRow()) >>>> >>>> table_schema = >>>> self.deal_request_schema_util.get_big_query_table_schema() >>>> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( >>>> dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec), >>>> dest_table_schema_vp=StaticValueProvider(str, table_schema), >>>> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', >>>> 'field': 'created_at'}), >>>> dead_letter_table_spec_vp=StaticValueProvider(str, >>>> self.dead_letter_table_spec) >>>> ) >>>> >>>> >>> Thanks! >>> >>> -- >>> Regards, >>> Sameer Bhadouria. >>> >>
