Thanks! Appreciate the quick response!! -Sameer.
On 2021/01/20 19:41:39, Boyuan Zhang <boyu...@google.com> wrote: > I opened https://github.com/apache/beam/pull/13779 for exxposing built-in > timestamp policy and commitOffsetInFinalize to ReadFromKafka. > > On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath <chamik...@google.com> > wrote: > > > > > > > On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <boyu...@google.com> wrote: > > > >> Re Cham, > >> > >>> Looking at https://github.com/apache/beam/pull/12572 seems like you > >>> just need support for FixedWindows, right ? This should work now I believe > >>> since Dataflow Python multi-language pipelines use portable job submission > >>> by default. > >> > >> The problem part is not the FixedWindows but the Reshuffle. In Java SDK, > >> Reshuffle will be expanded into > >> Window.<KV<K, V>>into(new > >> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) > >> which is rejected by the python sdk. > >> > > > > Ah ok. It should not be rejected by Python SDK anymore but Reshuffle > > ('IdentityWindowFn') indeed will fail when executing the pipeline using > > Dataflow Runner v2 currently. > > > > > >> I can have a simple PR to expose built-in timestamp policies. Brian, > >> would you like to identify how much work would need to output KafkaRecord > >> to python SDK? > >> > >> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <chamik...@google.com> > >> wrote: > >> > >>> Thanks for bringing this up Sameer. > >>> > >>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <boyu...@google.com> wrote: > >>> > >>>> +Chamikara Jayalath <chamik...@google.com> > >>>> > >>>> Hi Sameer, > >>>> > >>>> Thanks for reaching out! > >>>> > >>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform > >>>> when we have CustomWindow support in python SDK, which should be coming > >>>> soon. > >>>> > >>> > >>> Looking at https://github.com/apache/beam/pull/12572 seems like you > >>> just need support for FixedWindows, right ? This should work now I believe > >>> since Dataflow Python multi-language pipelines use portable job submission > >>> by default. > >>> > >>> > >>>> > >>>> In terms of *TimestampPolicyFactory*,if you are using the built-in > >>>> types, like ProcessingTimePolicy > >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, > >>>> LogAppendTimePolicy > >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> > >>>> and withCreateTime > >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, > >>>> it's not hard to expose them to ReadFromKafka. > >>>> > >>> > >>> Yeah, exposing existing policies should not be too hard but defining new > >>> policies (or any other option that requires second order functions) > >>> requires cross-language UDF support which is not available yet. > >>> > >>> > >>>> > >>>> Another interesting topic you have mentioned in > >>>> https://github.com/apache/beam/pull/12572 is that you also want to > >>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires > >>>> the > >>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It > >>>> might be a future work for x-lang Kafka. > >>>> > >>> > >>> This is because current ReadFromKafka transform exposes > >>> Read.withoutMetaData() transform [1]. > >>> I think current Beam Schema support in Python should be adequate to > >>> expand this and support a PCollection<Row> that represents a > >>> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette > >>> <bhule...@google.com> to confirm. > >>> > >>> [1] > >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595 > >>> [2] > >>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java > >>> > >>> Thanks, > >>> Cham > >>> > >>> > >>>> > >>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < > >>>> sameer.bhadou...@gmail.com> wrote: > >>>> > >>>>> Hello, > >>>>> > >>>>> I am using Beam's Cross language support for reading from Kafka but it > >>>>> is missing some features available in the java sdk that I would like to > >>>>> use. Specifically, I am interested in the Kafka Commit Transform > >>>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This > >>>>> will also require being able to specify if the metadata is needed or > >>>>> not as > >>>>> part of the KafkaIO transform creation. In addition, the > >>>>> `commitOffsetsInFinalize` > >>>>> and `TimestampPolicyFactory`parameters are also missing from the > >>>>> python wrapper. > >>>>> > >>>>> My use case is as follows: > >>>>> I have Kafka topics that have data corresponding to the Mongo change > >>>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I > >>>>> read these updated mongo documents, apply some transformations and > >>>>> stream > >>>>> them to BigQuery. > >>>>> > >>>>> Currently, I can only use the `auto.commit` from Kafka consumer config > >>>>> and I believe the message is acked/offset committed after the consumer > >>>>> in > >>>>> KafkaIO finishes reading them. If there are any errors in later stages > >>>>> of > >>>>> my pipeline or if the pipeline is restarted and it can't be drained > >>>>> gracefully, I will lose the already acked messages. Hence, I want to > >>>>> commit > >>>>> the offsets only after they are successfully written to BigQuery. > >>>>> > >>>>> Here is a snippet of my pipeline code. > >>>>> > >>>>>> def run(self, pipeline: Pipeline): > >>>>>> consumer_config = { > >>>>>> 'bootstrap.servers': self.bootstrap_servers, > >>>>>> 'auto.offset.reset': 'latest', > >>>>>> # Ideally we want auto.commit disabled, but we need a way to > >>>>>> acknowledge the messages manually > >>>>>> # 'enable.auto.commit': 'false', # messages must be acked > >>>>>> explicitly but prevents loss in case of failures > >>>>>> 'auto.commit.interval.ms': '60000', # keep a high value since > >>>>>> manual commits are not supported and messages will be lost if there is > >>>>>> an error in the pipeline > >>>>>> 'group.id': 'dev_streaming_dr_beam_pipeline' > >>>>>> } > >>>>>> > >>>>>> streamed_dr_kvs_raw = ( > >>>>>> pipeline > >>>>>> | 'Read from Kafka Stream' >> > >>>>>> ReadFromKafka( > >>>>>> consumer_config=consumer_config, > >>>>>> topics=['mongo_kafka_connect.requests'], > >>>>>> max_num_records=1, > >>>>>> ) > >>>>>> ) > >>>>>> > >>>>>> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' > >>>>>> >> ParDo(MongoKafkaMessageDeserializer()) > >>>>>> > >>>>>> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] > >>>>>> = lambda elem: elem.mongo_document is not None > >>>>>> filtered_dr_ds_with_record_ts = ( > >>>>>> dr_data_stream > >>>>>> | 'Filter empty values' >> Filter(filter_record_fn) > >>>>>> | 'Extract Timestamp' >> > >>>>>> ParDo(MongoKafkaMessageTimestampExtractor()) > >>>>>> ) > >>>>>> > >>>>>> # The lateness window defines how long the state is kept for older > >>>>>> windows > >>>>>> # and saving state for a longer duration can create memory > >>>>>> pressure. Note, the state is > >>>>>> # saved in persistent disk but is optimistically fetched in the > >>>>>> local memory. > >>>>>> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' > >>>>>> >> WindowInto( > >>>>>> FixedWindows(30), # 30 seconds > >>>>>> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any > >>>>>> time late data arrives after 1 min > >>>>>> allowed_lateness=24 * 60 * 60, # 24 hours for late data > >>>>>> accumulation_mode=AccumulationMode.DISCARDING > >>>>>> ) > >>>>>> > >>>>>> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = > >>>>>> lambda elems: elems[0].mongo_document > >>>>>> de_duped_dr_records = ( > >>>>>> batched_dr_records > >>>>>> | 'Group by Message Key' >> GroupBy('message_key') > >>>>>> | 'Select Latest' >> Latest.PerKey() > >>>>>> | 'Values' >> Values() > >>>>>> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) > >>>>>> ) > >>>>>> > >>>>>> dr_with_features = de_duped_dr_records | 'Extract Features' >> > >>>>>> ParDo(DealRequestEnricher()) > >>>>>> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> > >>>>>> ParDo(DealRequestFeaturesToBQTableRow()) > >>>>>> > >>>>>> table_schema = > >>>>>> self.deal_request_schema_util.get_big_query_table_schema() > >>>>>> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( > >>>>>> dest_table_spec_vp=StaticValueProvider(str, > >>>>>> self.dst_bq_table_spec), > >>>>>> dest_table_schema_vp=StaticValueProvider(str, table_schema), > >>>>>> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', > >>>>>> 'field': 'created_at'}), > >>>>>> dead_letter_table_spec_vp=StaticValueProvider(str, > >>>>>> self.dead_letter_table_spec) > >>>>>> ) > >>>>>> > >>>>>> > >>>>> Thanks! > >>>>> > >>>>> -- > >>>>> Regards, > >>>>> Sameer Bhadouria. > >>>>> > >>>> >