Hi There , I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two issue
first one if write the same file on s3 all the files gets committed , however when I write the same on HDFS I see its remains on .pending state , could be related to second problem below Second issue : My custom writer is writing Avro to parquet and writer is something like this extended from BaseStreamWriter @transient private var writer: ParquetWriter[T] = _ override def open(fs: FileSystem, path: Path): Unit = { val conf = new Configuration() conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false) conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false) writer = AvroParquetWriter .builder[T](path) .withSchema(new Schema.Parser().parse(schema)) .withCompressionCodec(compressionCodecName) .withConf(conf) .build() } override def write(element: T): Unit = writer.write(element) override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](schema) override def close(): Unit = writer.close() override def getPos: Long = writer.getDataSize override def flush(): Long = super.flush() What I noticed during recovering from checkpoint it fails to flush , although I have overriden flush ^^ above . The issue seems it doesn't have handle of stream writer that's why it is failing when flush call for stream writer , not sure if first .pedning state is related to this also . -------------------------------------------------- 11:52:04.082 [pool-13-thread-1] INFO o.a.flink.runtime.taskmanager.Task - Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 4 for operator Source:- kafka source (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_73] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73] Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Source: eo_open- kafka source (1/1). ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://XXXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_73] at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_73] at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) ~[flink-core-1.3.2.jar:1.3.2] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] ... 5 common frames omitted Suppressed: java.lang.Exception: Could not properly cancel managed operator state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2] ... 5 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://xxx:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96) ... 7 common frames omitted Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://XXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:288) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:521) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1185) ... 5 common frames omitted Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1341) -------------------------------------