Yes we have checkpointing enabled in our cluster.

Pipeline runs fine when I do a restart without making any code changes but 
after adding a DoFn class (that too not used in pipeline), we have observed 
pipeline still reads from kafka, does windowing and distinct but does not write 
to S3.

I have checked the watermark for FileIO.Write does not progess in Flink UI and 
it shows the watermark when the savepoint was done.

On 2021/02/18 20:36:45, Reuven Lax <re...@google.com> wrote: 
> Do you have checkpointing enabled in your Flink cluster?
> 
> On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <tap...@gmail.com> wrote:
> 
> > Hi,
> >
> > I am currently working on a Beam pipeline (Flink runner) where we read
> > JSON events from Kafka and write the output in parquet format to S3.
> >
> > We write to S3 after every 10 min.
> >
> > We have observed that our pipeline sometimes stops writing to S3 after
> > restarts (even for a non breaking minor code change), if we change kafka
> > offset and restart pipeline it starts writing to S3 again.
> >
> > While s3 write fails, Pipeline runs fine without any issues and it
> > processes records until FileIO stage. It gives no error/exceptions in logs
> > but silently fails to write to S3 at FileIO stage.
> >
> > This is the stage where it is not sending any records out -
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> > -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
> > key/AddKeys/Map/ParMultiDo(Anonymous)
> >
> > We have checked our Windowing function by logging records after windowing,
> > windowing works fine.
> >
> > This is our code snippet -
> >
> > parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
> >
> > FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
> >                     .triggering(AfterWatermark.pastEndOfWindow())
> >                     .withAllowedLateness(Duration.ZERO,
> > Window.ClosingBehavior.FIRE_ALWAYS)
> >                     .discardingFiredPanes())
> >
> >                     .apply(Distinct.create())
> >
> >                     .apply(FileIO.<GenericRecord>write()
> >                             .via(ParquetIO.sink(getOutput_schema()))
> >                             .to(outputPath.isEmpty() ? outputPath() :
> > outputPath)
> >                             .withNumShards(1)
> >                             .withNaming(new
> > CustomFileNaming("snappy.parquet")));
> >
> > Any idea what could be wrong here or any open bugs in Beam?
> >
> >
> > Regards,
> > Tapan Upadhyay
> >
> >
> 

Reply via email to