Hello!

I am using Beam v2.30 with Flink runner v1.11. My application reads from
AWS Kinesis stream, adds windowing to the records and then puts
window-sized files to AWS S3. Checkpointing strategy used is

>From time to time, upon restarting the application from a savepoint, I see
some records actually never coming to S3. I am not sure where the actual
issue sits: window not being fully restored from savepoint, window
restored, but not fired, window fired, but being buffered forever somewhere
in Sink, or simple misconfig.

I am looking for some gidance regarding the questions below. Maybe, someone
could point me to relevant Beam codebase or documentation regarding deeper
implementation details of Beam.

- in this application, what is stateful and gets saved into a savepoint?
Window pane with its timers? FileIO sink buffers? Both? What that state
actually looks like? How I can inspect it?
- some tips regarding - what happens to window pane in savepoint when the
app stays turned off - is it considered late data and gets discarded, when
the app starts again? If I expect my app to be down during, say, one hour
per day, should I configure windows accordingly?
- some flags I could pass to my application to debug this, which could
display me some record counts windows are firing, or records counts when
some IO buffers are freed, some additional info when the app starts from a
savepoint
- some advice about how to configure my windowing properly, given expected
app downtimes

I tried two different window configs so far, both seem giving some data
loss, from time to time. It also looks like, with smaller windows I get
bigger data loss, but it was hard to reproduce.

.apply("Creating micro-batches", Window.<>into(
        FixedWindows.of(Duration.standardSeconds(windowSizeSeconds))))


.apply("Creating micro-batches", Window.<>into(
        FixedWindows.of(Duration.standardSeconds(windowSizeSeconds)))
        .triggering(Repeatedly.forever(
                AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane()

.plusDelayOf(Duration.standardSeconds(windowSizeSeconds))))
        )
        .withAllowedLateness(Duration.standardDays(1),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
        .discardingFiredPanes())

FileIO sink looks like the following:

FileIO.<DestinationSpec, MyRecord>writeDynamic()
.by(Converters::toDestinationSpec)
.withDestinationCoder(AvroCoder.of(DestinationSpec.class))
.via(
     Contextful.fn(new GenericRecordConverter()),
     Contextful.fn((DestinationSpec s) -> AvroIO.sink(s.getSchema()))
)
.to(outputPath)
.withTempDirectory(tmpOutputPath)
.withNumShards(writerShards)
.withNaming(
        (DestinationSpec s) -> FileIO.Write.defaultNaming(
                buildDestinationPathPrefix(jobId, outputPath, s), ".avro"))

Thank you in advance.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>

Reply via email to