What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <
preston.b.kopriv...@cerner.com> wrote:

> Hi guys,
>
>
>
> I’m currently working on a simple system where the intention is to ingest
> data from a realtime stream – in this case amazon SQS – and write the
> output in an incremental fashion to a durable filesystem (ie S3).  It’s
> easy to think of this as a low-fi journaling system.  We need to make sure
> that data that’s written to the source queue eventually makes it to S3.  We
> are utilizing the FileIO windowed writes with a custom naming policy to
> partition the files by their event time.   Because SQS can’t guarantee
> order, we do have to allow late messages.  Moreover, we need a further
> guarantee that a message be written in a timely manner – we’re thinking
> some constant multiple of the windowing duration.  As a first pass, we were
> thinking a processing time based trigger that fires on some regular
> interval.  For context, here’s an example of the pipeline:
>
>
>
> ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message ->
> Write to Avro
>
>
>
>   pipeline
>
>         .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(
>
>             Window.<Message>configure()
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
>         .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
>
>         .setCoder(AvroCoder.of(recordClass))
>
>         .apply(
>
>             AvroIO.write(recordClass)
>
>                 .withWindowedWrites()
>
>                 .withTempDirectory(options.getTempDir())
>
>                 .withNumShards(options.getShards())
>
>                 .to(new WindowedFilenamePolicy(options.getOutputPrefix(),
> "avro")));
>
>
>
> This all seemed fairly straightforward.  I have not yet observed lost data
> with this pipeline, but I am seeing an issue with timeliness.  Things seem
> to get hung up on finalizing file output, but I have yet to truly pinpoint
> the issue.  To really highlight the issue, I can setup a test where I send
> a single message to the source queue.  If nothing else happens, the data
> never makes it to its final output using the FlinkRunner (beam-2.15.0,
> flink-1.8).  Has anyone seen this behavior before?  Is the expectation of
> eventual consistency wrong?
>
>
>
> Thanks,
>
> -Preston
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>

Reply via email to