Hi Pavel,

Thanks for the reply.

No, the event losses are not consistent. While we've been running our
pipelines in parallel (Beam vs Spark) we are seeing some days with no event
loss and some days with some, but it's always less than 0.05%



On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <[email protected]> wrote:

> Hello Lydian,
>
> Do you always observe data loss? Or - maybe, it happens only when you
> restart your pipeline from a Flink savepoint? If you lose data only between
> restarts - is you issue similar to
> https://github.com/apache/beam/issues/26041 ?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Tue, 18 Apr 2023 at 18:58, Lydian <[email protected]> wrote:
>
>> Hi,
>>
>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
>> fixed window to group messages.  We've had similar pipeline in spark that
>> we want to replace it with this new pipeline.  However, the Beam pipeline
>> seems always having events missing, which we are thinking could be due to
>> late events (because the number of missing events get lower when having
>> higher allow_lateness)
>>
>> We've tried the following approach to avoid late events, but none of them
>> are working:
>> 1.  Use Processing timestamp instead of event time. Ideally if windowing
>> is using the processing timestamp, It shouldn't consider any event as late.
>> But this doesn't seem to work at all.
>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
>> working as expected, we've also configured the allow_lateness. But it still
>> have missing events compared to our old spark pipelines.
>>
>> Here's the simplified code we have
>> ```
>>
>> def *add_timestamp*(event: Any) -> Any:
>>
>>     import time
>>
>>     from apache_beam import window
>>
>>     return window.*TimestampedValue*(event, time.*time*())
>>
>>
>> (pipeline
>>
>>     | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
>> consumer_config=consumer_config)
>>
>>     | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
>> (add_timestamp)
>>
>>     | "Window into Fixed Intervals"
>>
>>     >> beam.*WindowInto*(
>>
>>         beam.window.*FixedWindows*(fixed_window_size),
>>
>>         allowed_lateness=beam.utils.timestamp.*Duration*
>> (allowed_lateness)
>>
>>     )
>>
>>     |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
>> ```
>>
>> I am wondering:
>> 1. Is the add_timestamp approach correctly marked it to use processing
>> time for windowing?  If so, why there still late event consider we are
>> using processing time and not event time?
>> 2.  Are there are any other approaches to avoid dropping any late event
>> besides ` allowed_lateness`?  In flink you can output those late events as
>> side output, wondering if we can do similar thing in Beam as well? Would
>> someone provide some code example?
>>
>> Could someone help us debugging this?  Thanks!
>>
>> ---
>> * Flink's documentation about late event as side output:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>

-- 
Trevor Burke (he/him)   |   Software Engineer, Data Platform   |
 415.794.4111

Reply via email to