Hi There ,

I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two
issue

first one if write the same file on s3 all the files
gets committed , however when I write the same on HDFS I see its remains on
.pending state , could be related to second problem below

Second issue : My custom writer is writing Avro to parquet and writer is
something like this extended from BaseStreamWriter


  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
    val conf = new Configuration()
    conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
    conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
    writer = AvroParquetWriter
      .builder[T](path)
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withConf(conf)
      .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = super.flush()


What I noticed during recovering from checkpoint it fails to flush ,
although I have overriden flush ^^ above . The issue seems
it doesn't have handle of stream writer that's why it is failing when flush
call for stream writer , not sure if first .pedning
state is related to this also .


--------------------------------------------------
11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task -
Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6)
switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
java.lang.Exception: Could not materialize checkpoint 4 for operator
Source:- kafka source (1/1).
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_73]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[na:1.8.0_73]
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73]
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for
operator Source: eo_open- kafka source (1/1).
    ... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs://XXXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
[na:1.8.0_73]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
[na:1.8.0_73]
    at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
~[flink-core-1.3.2.jar:1.3.2]
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
    ... 5 common frames omitted
    Suppressed: java.lang.Exception: Could not properly cancel managed
operator state future.
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        ... 5 common frames omitted
    Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to
hdfs://xxx:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
        ... 7 common frames omitted
    Caused by: java.io.IOException: Could not flush and close the file
system output stream to
hdfs://XXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
        at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
        at
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
        at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:288)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:521)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1185)
        ... 5 common frames omitted
    Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 millis
timeout while waiting for channel to be ready for connect. ch :
java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1341)


        -------------------------------------

Reply via email to