Hi, I’m having trouble setting up a pipeline that continuously reads from S3, 
does a small transformation and then combines per key:

pipeline.apply(FileIO.match().filepattern(options.getDirectory() + 
"/input/*").continuously(Duration.standardSeconds(10), Watch.Growth.never()))
        .apply("Read", FileIO.readMatches())
        .apply(TextIO.readFiles())
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
        .apply(ParDo.of(new SplitFn()))
        .apply(Combine.perKey(new CombinePerKeyFn()))

That works in the direct runner, but in Flink the watermark isn’t advancing..

I asked someone else to look at it and he swapped out the file read with this 
code so he didn’t have to create the file:


pipeline
        .apply(GenerateSequence.from(0).withRate(1, 
Duration.standardSeconds(2)))
        .apply(MapElements.into(strings()).via(i -> "test" + 
ThreadLocalRandom.current().nextInt(10)));

Surprisingly, the pipeline started working.  That doesn’t use a splittable DoFn 
like the file read did, so we tried using a different splittable source and it 
didn’t work in Flink either.  All attempts have worked using the direct runner.

It seems like there’s something wrong in the splittable DoFn logic in the Flink 
runner where the timestamp isn’t advancing.  I’m fairly new to Beam and Flink 
and am a little lost at the moment.  Should I log a JIRA for this since it 
seems like a defect?  I plan on continuing to investigate, so any suggestions 
for where to start looking would be greatly appreciated.

I setup an example that has both splittable and non-splittable sources (based 
on the pipeline options) to make it easier to troubleshoot: 
https://github.com/dneth/file-streaming-example/blob/master/src/main/java/Example.java

Thanks


CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.

Reply via email to