Yes, we did enabled this in our pipeline. On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin <[email protected]> wrote:
> Thank you > > Just to confirm: how did you configure Kafka offset commits? Did you have > this flag enabled? > > > > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled-- > > > On Thursday, 20 April 2023, Trevor Burke <[email protected]> wrote: > > Hi Pavel, > > Thanks for the reply. > > No, the event losses are not consistent. While we've been running our > pipelines in parallel (Beam vs Spark) we are seeing some days with no event > loss and some days with some, but it's always less than 0.05% > > > > > > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin <[email protected]> > wrote: > >> > >> Hello Lydian, > >> Do you always observe data loss? Or - maybe, it happens only when you > restart your pipeline from a Flink savepoint? If you lose data only between > restarts - is you issue similar to > https://github.com/apache/beam/issues/26041 ? > >> > >> Best Regards, > >> Pavel Solomin > >> > >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > >> > >> > >> > >> > >> On Tue, 18 Apr 2023 at 18:58, Lydian <[email protected]> wrote: > >>> > >>> Hi, > >>> > >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming > data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute > fixed window to group messages. We've had similar pipeline in spark that > we want to replace it with this new pipeline. However, the Beam pipeline > seems always having events missing, which we are thinking could be due to > late events (because the number of missing events get lower when having > higher allow_lateness) > >>> > >>> We've tried the following approach to avoid late events, but none of > them are working: > >>> 1. Use Processing timestamp instead of event time. Ideally if > windowing is using the processing timestamp, It shouldn't consider any > event as late. But this doesn't seem to work at all. > >>> 2. Configure allow_lateness to 12 hour. Given that approach 1 seems > not working as expected, we've also configured the allow_lateness. But it > still have missing events compared to our old spark pipelines. > >>> > >>> Here's the simplified code we have > >>> ``` > >>> > >>> def add_timestamp(event: Any) -> Any: > >>> > >>> import time > >>> > >>> from apache_beam import window > >>> > >>> return window.TimestampedValue(event, time.time()) > >>> > >>> (pipeline > >>> > >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", > consumer_config=consumer_config) > >>> > >>> | "Adding 'trigger_processing_time' timestamp" >> > beam.Map(add_timestamp) > >>> > >>> | "Window into Fixed Intervals" > >>> > >>> >> beam.WindowInto( > >>> > >>> beam.window.FixedWindows(fixed_window_size), > >>> > >>> > allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness) > >>> > >>> ) > >>> > >>> | "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path)) > >>> > >>> ``` > >>> > >>> I am wondering: > >>> 1. Is the add_timestamp approach correctly marked it to use processing > time for windowing? If so, why there still late event consider we are > using processing time and not event time? > >>> 2. Are there are any other approaches to avoid dropping any late > event besides ` allowed_lateness`? In flink you can output those late > events as side output, wondering if we can do similar thing in Beam as > well? Would someone provide some code example? > >>> > >>> Could someone help us debugging this? Thanks! > >>> > >>> --- > >>> * Flink's documentation about late event as side output: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output > >>> > >>> > >>> Sincerely, > >>> Lydian Lee > > > > > > -- > > Trevor Burke (he/him) | Software Engineer, Data Platform | > 415.794.4111 > > < > https://lh6.googleusercontent.com/T4F0y7Vef9k5-xDkO2P0yW9CjOzPTBJppRLnXgApw0DtoZMhUHd8bGVKt9Cr8oZ2WTsw8hqKiCfFKwI9fIx7ySHyW4uOFkxPVu0XNr-6yc6uWOZxmW7PZgRLCCYOk1kmg__wGfMlsN0 > > > > > > -- > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > -- Sincerely, Lydian Lee
