Re Cham,

> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.

The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
Reshuffle will be expanded into
Window.<KV<K, V>>into(new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
which is rejected by the python sdk.

I can have a simple PR to expose built-in timestamp policies. Brian, would
you like to identify how much work would need to output KafkaRecord to
python SDK?

On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]>
wrote:

> Thanks for bringing this up Sameer.
>
> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote:
>
>> +Chamikara Jayalath <[email protected]>
>>
>> Hi Sameer,
>>
>> Thanks for reaching out!
>>
>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>> when we have CustomWindow support in python SDK, which should be coming
>> soon.
>>
>
> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.
>
>
>>
>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>> types, like ProcessingTimePolicy
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
>> LogAppendTimePolicy
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
>> and withCreateTime
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
>> it's not hard to expose them to ReadFromKafka.
>>
>
> Yeah, exposing existing policies should not be too hard but defining new
> policies (or any other option that requires second order functions)
> requires cross-language UDF support which is not available yet.
>
>
>>
>> Another interesting topic you have mentioned in
>> https://github.com/apache/beam/pull/12572 is that you also want to
>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>> might be a future work for x-lang Kafka.
>>
>
> This is because current ReadFromKafka transform exposes
> Read.withoutMetaData() transform [1].
> I think current Beam Schema support in Python should be adequate to expand
> this and support a PCollection<Row> that represents a
> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette
> <[email protected]> to confirm.
>
> [1]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
> [2]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>
> Thanks,
> Cham
>
>
>>
>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>> [email protected]> wrote:
>>
>>> Hello,
>>>
>>> I am using Beam's Cross language support for reading from Kafka but it
>>> is missing some features available in the java sdk that I would like to
>>> use. Specifically, I am interested in the Kafka Commit Transform
>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
>>> will also require being able to specify if the metadata is needed or not as
>>> part of the KafkaIO transform creation. In addition, the 
>>> `commitOffsetsInFinalize`
>>> and `TimestampPolicyFactory`parameters are also missing from the python
>>> wrapper.
>>>
>>> My use case is as follows:
>>> I have Kafka topics that have data corresponding to the Mongo change
>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>>> read these updated mongo documents, apply some transformations and stream
>>> them to BigQuery.
>>>
>>> Currently, I can only use the `auto.commit` from Kafka consumer config
>>> and I believe the message is acked/offset committed after the consumer in
>>> KafkaIO finishes reading them. If there are any errors in later stages of
>>> my pipeline or if the pipeline is restarted and it can't be drained
>>> gracefully, I will lose the already acked messages. Hence, I want to commit
>>> the offsets only after they are successfully written to BigQuery.
>>>
>>> Here is a snippet of my pipeline code.
>>>
>>>> def run(self, pipeline: Pipeline):
>>>>    consumer_config = {
>>>>       'bootstrap.servers': self.bootstrap_servers,
>>>>       'auto.offset.reset': 'latest',
>>>>       # Ideally we want auto.commit disabled, but we need a way to 
>>>> acknowledge the messages manually
>>>>       # 'enable.auto.commit': 'false',  # messages must be acked 
>>>> explicitly but prevents loss in case of failures
>>>>       'auto.commit.interval.ms': '60000',  # keep a high value since 
>>>> manual commits are not supported and messages will be lost if there is an 
>>>> error in the pipeline
>>>>       'group.id': 'dev_streaming_dr_beam_pipeline'
>>>>    }
>>>>
>>>>    streamed_dr_kvs_raw = (
>>>>          pipeline
>>>>          | 'Read from Kafka Stream' >>
>>>>          ReadFromKafka(
>>>>             consumer_config=consumer_config,
>>>>             topics=['mongo_kafka_connect.requests'],
>>>>             max_num_records=1,
>>>>          )
>>>>    )
>>>>
>>>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>>>> ParDo(MongoKafkaMessageDeserializer())
>>>>
>>>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>>>> lambda elem: elem.mongo_document is not None
>>>>    filtered_dr_ds_with_record_ts = (
>>>>          dr_data_stream
>>>>          | 'Filter empty values' >> Filter(filter_record_fn)
>>>>          | 'Extract Timestamp' >> 
>>>> ParDo(MongoKafkaMessageTimestampExtractor())
>>>>    )
>>>>
>>>>    # The lateness window defines how long the state is kept for older 
>>>> windows
>>>>    # and saving state for a longer duration can create memory pressure. 
>>>> Note, the state is
>>>>    # saved in persistent disk but is optimistically fetched in the local 
>>>> memory.
>>>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>>>> WindowInto(
>>>>       FixedWindows(30),  # 30 seconds
>>>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
>>>> time late data arrives after 1 min
>>>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>>>       accumulation_mode=AccumulationMode.DISCARDING
>>>>    )
>>>>
>>>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = 
>>>> lambda elems: elems[0].mongo_document
>>>>    de_duped_dr_records = (
>>>>          batched_dr_records
>>>>          | 'Group by Message Key' >> GroupBy('message_key')
>>>>          | 'Select Latest' >> Latest.PerKey()
>>>>          | 'Values' >> Values()
>>>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>>>    )
>>>>
>>>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
>>>> ParDo(DealRequestEnricher())
>>>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
>>>> ParDo(DealRequestFeaturesToBQTableRow())
>>>>
>>>>    table_schema = 
>>>> self.deal_request_schema_util.get_big_query_table_schema()
>>>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>>>>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
>>>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>>>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
>>>> 'field': 'created_at'}),
>>>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
>>>> self.dead_letter_table_spec)
>>>>    )
>>>>
>>>>
>>> Thanks!
>>>
>>> --
>>> Regards,
>>> Sameer Bhadouria.
>>>
>>

Reply via email to