On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <[email protected]> wrote:

> Re Cham,
>
>> Looking at https://github.com/apache/beam/pull/12572 seems like you just
>> need support for FixedWindows, right ? This should work now I believe since
>> Dataflow Python multi-language pipelines use portable job submission by
>> default.
>
> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
> Reshuffle will be expanded into
> Window.<KV<K, V>>into(new
> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
> which is rejected by the python sdk.
>

Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
('IdentityWindowFn') indeed will fail when executing the pipeline using
Dataflow Runner v2 currently.


> I can have a simple PR to expose built-in timestamp policies. Brian, would
> you like to identify how much work would need to output KafkaRecord to
> python SDK?
>
> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Thanks for bringing this up Sameer.
>>
>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> +Chamikara Jayalath <[email protected]>
>>>
>>> Hi Sameer,
>>>
>>> Thanks for reaching out!
>>>
>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>>> when we have CustomWindow support in python SDK, which should be coming
>>> soon.
>>>
>>
>> Looking at https://github.com/apache/beam/pull/12572 seems like you just
>> need support for FixedWindows, right ? This should work now I believe since
>> Dataflow Python multi-language pipelines use portable job submission by
>> default.
>>
>>
>>>
>>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>>> types, like ProcessingTimePolicy
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
>>> LogAppendTimePolicy
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
>>> and withCreateTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
>>> it's not hard to expose them to ReadFromKafka.
>>>
>>
>> Yeah, exposing existing policies should not be too hard but defining new
>> policies (or any other option that requires second order functions)
>> requires cross-language UDF support which is not available yet.
>>
>>
>>>
>>> Another interesting topic you have mentioned in
>>> https://github.com/apache/beam/pull/12572 is that you also want to
>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>>> might be a future work for x-lang Kafka.
>>>
>>
>> This is because current ReadFromKafka transform exposes
>> Read.withoutMetaData() transform [1].
>> I think current Beam Schema support in Python should be adequate to
>> expand this and support a PCollection<Row> that represents a
>> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette
>> <[email protected]> to confirm.
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
>> [2]
>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>>> [email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using Beam's Cross language support for reading from Kafka but it
>>>> is missing some features available in the java sdk that I would like to
>>>> use. Specifically, I am interested in the Kafka Commit Transform
>>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
>>>> will also require being able to specify if the metadata is needed or not as
>>>> part of the KafkaIO transform creation. In addition, the 
>>>> `commitOffsetsInFinalize`
>>>> and `TimestampPolicyFactory`parameters are also missing from the
>>>> python wrapper.
>>>>
>>>> My use case is as follows:
>>>> I have Kafka topics that have data corresponding to the Mongo change
>>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>>>> read these updated mongo documents, apply some transformations and stream
>>>> them to BigQuery.
>>>>
>>>> Currently, I can only use the `auto.commit` from Kafka consumer config
>>>> and I believe the message is acked/offset committed after the consumer in
>>>> KafkaIO finishes reading them. If there are any errors in later stages of
>>>> my pipeline or if the pipeline is restarted and it can't be drained
>>>> gracefully, I will lose the already acked messages. Hence, I want to commit
>>>> the offsets only after they are successfully written to BigQuery.
>>>>
>>>> Here is a snippet of my pipeline code.
>>>>
>>>>> def run(self, pipeline: Pipeline):
>>>>>    consumer_config = {
>>>>>       'bootstrap.servers': self.bootstrap_servers,
>>>>>       'auto.offset.reset': 'latest',
>>>>>       # Ideally we want auto.commit disabled, but we need a way to 
>>>>> acknowledge the messages manually
>>>>>       # 'enable.auto.commit': 'false',  # messages must be acked 
>>>>> explicitly but prevents loss in case of failures
>>>>>       'auto.commit.interval.ms': '60000',  # keep a high value since 
>>>>> manual commits are not supported and messages will be lost if there is an 
>>>>> error in the pipeline
>>>>>       'group.id': 'dev_streaming_dr_beam_pipeline'
>>>>>    }
>>>>>
>>>>>    streamed_dr_kvs_raw = (
>>>>>          pipeline
>>>>>          | 'Read from Kafka Stream' >>
>>>>>          ReadFromKafka(
>>>>>             consumer_config=consumer_config,
>>>>>             topics=['mongo_kafka_connect.requests'],
>>>>>             max_num_records=1,
>>>>>          )
>>>>>    )
>>>>>
>>>>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>>>>> ParDo(MongoKafkaMessageDeserializer())
>>>>>
>>>>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>>>>> lambda elem: elem.mongo_document is not None
>>>>>    filtered_dr_ds_with_record_ts = (
>>>>>          dr_data_stream
>>>>>          | 'Filter empty values' >> Filter(filter_record_fn)
>>>>>          | 'Extract Timestamp' >> 
>>>>> ParDo(MongoKafkaMessageTimestampExtractor())
>>>>>    )
>>>>>
>>>>>    # The lateness window defines how long the state is kept for older 
>>>>> windows
>>>>>    # and saving state for a longer duration can create memory pressure. 
>>>>> Note, the state is
>>>>>    # saved in persistent disk but is optimistically fetched in the local 
>>>>> memory.
>>>>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>>>>> WindowInto(
>>>>>       FixedWindows(30),  # 30 seconds
>>>>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
>>>>> time late data arrives after 1 min
>>>>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>>>>       accumulation_mode=AccumulationMode.DISCARDING
>>>>>    )
>>>>>
>>>>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = 
>>>>> lambda elems: elems[0].mongo_document
>>>>>    de_duped_dr_records = (
>>>>>          batched_dr_records
>>>>>          | 'Group by Message Key' >> GroupBy('message_key')
>>>>>          | 'Select Latest' >> Latest.PerKey()
>>>>>          | 'Values' >> Values()
>>>>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>>>>    )
>>>>>
>>>>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
>>>>> ParDo(DealRequestEnricher())
>>>>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
>>>>> ParDo(DealRequestFeaturesToBQTableRow())
>>>>>
>>>>>    table_schema = 
>>>>> self.deal_request_schema_util.get_big_query_table_schema()
>>>>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>>>>>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
>>>>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>>>>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
>>>>> 'field': 'created_at'}),
>>>>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
>>>>> self.dead_letter_table_spec)
>>>>>    )
>>>>>
>>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Regards,
>>>> Sameer Bhadouria.
>>>>
>>>

Reply via email to