Hi Luke, I am also looking at the `WatermarkEstimators.manual` option, in parallel. Now we are getting data past our Fixed Window but the aggregation is not as expected. The doc says setWatermark will "set timestamp before or at the timestamps of all future elements produced by the associated DoFn". If I output with a timestamp as below then could you please clarify on how we should set the watermark for this manual watermark estimator?
receiver.outputWithTimestamp(ossRecord, Instant.now()); Thanks, Praveen On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote: > Is the watermark advancing[1, 2] for the SDF such that the windows can > close allowing for the Count transform to produce output? > > 1: https://www.youtube.com/watch?v=TWxSLmkWPm4 > 2: https://beam.apache.org/documentation/programming-guide/#windowing > > On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <gaurav.na...@oracle.com> > wrote: > >> Hi everyone! >> >> We are developing a new IO connector using the SDF API, and testing it >> with the following simple counting pipeline: >> >> >> >> p.apply(MyIO.read() >> >> .withStream(inputStream) >> >> .withStreamPartitions(Arrays.asList(0)) >> >> .withConsumerConfig(config) >> >> ) // gets a PCollection<KV<String, String>> >> >> >> >> >> >> .apply(Values.<String>*create*()) // PCollection<String> >> >> >> >> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) >> >> .withAllowedLateness(Duration.standardDays(1)) >> >> .accumulatingFiredPanes()) >> >> >> >> .apply(Count.<String>perElement()) >> >> >> >> >> >> // write PCollection<KV<String, Long>> to stream >> >> .apply(MyIO.write() >> >> .withStream(outputStream) >> >> .withConsumerConfig(config)); >> >> >> >> >> >> Without the window transform, we can read from the stream and write to >> it, however, I don’t see output after the Window transform. Could you >> please help pin down the issue? >> >> Thank you, >> >> Gaurav >> > -- Thanks, Praveen K Viswanathan