What WindowFn are you using? On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake < preston.b.kopriv...@cerner.com> wrote:
> Hi guys, > > > > I’m currently working on a simple system where the intention is to ingest > data from a realtime stream – in this case amazon SQS – and write the > output in an incremental fashion to a durable filesystem (ie S3). It’s > easy to think of this as a low-fi journaling system. We need to make sure > that data that’s written to the source queue eventually makes it to S3. We > are utilizing the FileIO windowed writes with a custom naming policy to > partition the files by their event time. Because SQS can’t guarantee > order, we do have to allow late messages. Moreover, we need a further > guarantee that a message be written in a timely manner – we’re thinking > some constant multiple of the windowing duration. As a first pass, we were > thinking a processing time based trigger that fires on some regular > interval. For context, here’s an example of the pipeline: > > > > ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> > Write to Avro > > > > pipeline > > .apply(SqsIO.read().withQueueUrl(options.getQueueUrl())) > > .apply( > > Window.<Message>configure() > > .discardingFiredPanes() > > .triggering( > > Repeatedly.forever( > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur)))) > > .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options))) > > .setCoder(AvroCoder.of(recordClass)) > > .apply( > > AvroIO.write(recordClass) > > .withWindowedWrites() > > .withTempDirectory(options.getTempDir()) > > .withNumShards(options.getShards()) > > .to(new WindowedFilenamePolicy(options.getOutputPrefix(), > "avro"))); > > > > This all seemed fairly straightforward. I have not yet observed lost data > with this pipeline, but I am seeing an issue with timeliness. Things seem > to get hung up on finalizing file output, but I have yet to truly pinpoint > the issue. To really highlight the issue, I can setup a test where I send > a single message to the source queue. If nothing else happens, the data > never makes it to its final output using the FlinkRunner (beam-2.15.0, > flink-1.8). Has anyone seen this behavior before? Is the expectation of > eventual consistency wrong? > > > > Thanks, > > -Preston > > > > > > > CONFIDENTIALITY NOTICE This message and any included attachments are from > Cerner Corporation and are intended only for the addressee. The information > contained in this message is confidential and may constitute inside or > non-public information under international, federal, or state securities > laws. Unauthorized forwarding, printing, copying, distribution, or use of > such information is strictly prohibited and may be unlawful. If you are not > the addressee, please promptly delete this message and notify the sender of > the delivery error by e-mail or you may call Cerner's corporate offices in > Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>. >