After digging into the source code, we found that when Flink job is canceled, a 
TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically 
interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and 
following with join method. And since the task thread is now try to close 
DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed 
out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    thrownewInterruptedIOException(        
"Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before 
executer.join(interruptInterval). Can anyone help?






Hi,


We are currently using BucketingSink to save data into HDFS in parquet format. 
But when the flink job was cancelled, we always got Exception in 
BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] 
[org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be 
acknowledged by pipeline
at 
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)


It seems that DFSOutputStream haven't been closed before task thread is force 
terminated. We found a similar problem in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
 , but setting "akka.ask.timeout" to a larger value does not work for us. So 
how can we make sure the stream is safely closed when cacelling a job?


Best,
wangsan





Reply via email to