Hi Wangsan,

We were struggling with this for many days as well. In the end we found a work 
around. Well work-around, this for sure qualifies as one of the ugliest hacks I 
have ever contemplated. Our work-around for Flink immediately interrupting the 
close, is to continue closing on another thread! Here is an example in Scala:

class MyBucketingSink[A](basePath: String) extends BucketingSink[A](basePath) {

  override def close(): Unit = {
    //
    // Unfortunately, Flink closes very very very very eagerly. So eagerly in 
fact that it will try to kill us by
    // interrupting the current thread immediately. Let's try to continue on a 
different thread :evil-grin:
    //

    def superClose(): Unit = super.close()

    new Thread(
      new Runnable {
        override def run(): Unit = {
          logger.info("Close invoked on MyBucketingSink on task " + 
getRuntimeContext.getTaskNameWithSubtasks)
          try {
            superClose()
          } catch {
            case e: Throwable => logger.error(e)("Failed to close task " + 
getRuntimeContext.getTaskNameWithSubtasks)
          }
        }
      },
      "Closing task " + getRuntimeContext.getTaskNameWithSubtasks
    ).start()
  }
}

Obviously, if the close hangs, the entire job will hang and Flink will need to 
be fully restarted.
Please let us know if you see any other problems with this approach.

Kind regards,
    Erik.



> Op 27 sep. 2017, om 07:33 heeft wangsan <wamg...@163.com> het volgende 
> geschreven:
> 
> After digging into the source code, we found that when Flink job is canceled, 
> a TaskCanceler thread is created.
> 
> The TaskCanceler thread calls cancel() on the invokable and periodically 
> interrupts the
> task thread until it has terminated.
> 
> try {
>   invokable.cancel();
> } catch (Throwable t) {
>   logger.error("Error while canceling the task {}.", taskName, t);
> }//......executer.interrupt();try {
>   executer.join(interruptInterval);
> }catch (InterruptedException e) {  // we can ignore this}//......
> Notice that TaskCanceler first send interrupt signal to task thread, and 
> following with join method. And since the task thread is now try to close 
> DFSOutputStream, which is waiting for ack, thus InterruptedException is 
> throwed out in task thread.
> 
> synchronized (dataQueue) {while (!streamerClosed) {
>   checkClosed();  if (lastAckedSeqno >= seqno) {    break;
>   }  try {
>     dataQueue.wait(1000); // when we receive an ack, we notify on
>     // dataQueue
>   } catch (InterruptedException ie) {    throw new InterruptedIOException(    
>     "Interrupted while waiting for data to be acknowledged by pipeline");
>   }
> }
> I was confused why TaskCanceler call executer.interrupt() before 
> executer.join(interruptInterval). Can anyone help?
> 
> 
> 
> 
> 
> 
> Hi,
> 
> We are currently using BucketingSink to save data into HDFS in parquet 
> format. But when the flink job was cancelled, we always got Exception in 
> BucketingSink's  close method. The datailed exception info is as below:
> [ERROR] [2017-09-26 20:51:58,893] 
> [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
> of stream operator.
> java.io.InterruptedIOException: Interrupted while waiting for data to be 
> acknowledged by pipeline
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
>       at 
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>       at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>       at 
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
>       at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
>       at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
>         .......
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
> 
> It seems that DFSOutputStream haven't been closed before task thread is force 
> terminated. We found a similar problem in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html,>
>  , but setting "akka.ask.timeout" to a larger value does not work for us. So 
> how can we make sure the stream is safely closed when cacelling a job?
> 
> Best,
> wangsan
> 
> 
> 
> 
> 

Reply via email to