Yes we have checkpointing enabled in our cluster. Pipeline runs fine when I do a restart without making any code changes but after adding a DoFn class (that too not used in pipeline), we have observed pipeline still reads from kafka, does windowing and distinct but does not write to S3.
I have checked the watermark for FileIO.Write does not progess in Flink UI and it shows the watermark when the savepoint was done. On 2021/02/18 20:36:45, Reuven Lax <re...@google.com> wrote: > Do you have checkpointing enabled in your Flink cluster? > > On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <tap...@gmail.com> wrote: > > > Hi, > > > > I am currently working on a Beam pipeline (Flink runner) where we read > > JSON events from Kafka and write the output in parquet format to S3. > > > > We write to S3 after every 10 min. > > > > We have observed that our pipeline sometimes stops writing to S3 after > > restarts (even for a non breaking minor code change), if we change kafka > > offset and restart pipeline it starts writing to S3 again. > > > > While s3 write fails, Pipeline runs fine without any issues and it > > processes records until FileIO stage. It gives no error/exceptions in logs > > but silently fails to write to S3 at FileIO stage. > > > > This is the stage where it is not sending any records out - > > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards -> > > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles) > > -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void > > key/AddKeys/Map/ParMultiDo(Anonymous) > > > > We have checked our Windowing function by logging records after windowing, > > windowing works fine. > > > > This is our code snippet - > > > > parquetRecord.apply("Batch Events", Window.<GenericRecord>into( > > > > FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime)))) > > .triggering(AfterWatermark.pastEndOfWindow()) > > .withAllowedLateness(Duration.ZERO, > > Window.ClosingBehavior.FIRE_ALWAYS) > > .discardingFiredPanes()) > > > > .apply(Distinct.create()) > > > > .apply(FileIO.<GenericRecord>write() > > .via(ParquetIO.sink(getOutput_schema())) > > .to(outputPath.isEmpty() ? outputPath() : > > outputPath) > > .withNumShards(1) > > .withNaming(new > > CustomFileNaming("snappy.parquet"))); > > > > Any idea what could be wrong here or any open bugs in Beam? > > > > > > Regards, > > Tapan Upadhyay > > > > >