Hi all,

We are trying to modify our Flink job with iteration (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/iterations/).
The job works fine with expected outputs and the checkpoints are created
successfully at regular intervals. However, when we'd like to create a
savepoint for the job, the savepoint got stuck. From Flink UI, some
operators could successfully acknowledge (100%) but the other operators
have 0% acknowledgements. Below are some of the thread information of those
operators with 0% acknowledgements. The main change of this Flink job is
the adoption of iterationstream. Without iteration, the
savepoints/checkpoints can be created successfully. Does anyone encounter
similar issues or know how to fix this? Any comment is appreciated.

Best wishes,
Chen-Che Huang

{
  "threadName": "OutputFlusher for xxx-operator",
  "stringifiedThreadInfo": "\"OutputFlusher for lookup-user-id-v0.6.0\"
daemon prio=5 Id=278 TIMED_WAITING\n\tat java.base@11.0.13/j
   ava.lang.Thread.sleep(Native Method)\n\tat
app//org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWr
  iter.java:240)\n\n"
},
{
  "threadName": "Channel state writer IterationSink-13 (3/12)#0",
  "stringifiedThreadInfo": "\"Channel state writer IterationSink-13
(3/12)#0\" daemon prio=5 Id=263 WAITING on java.util.concurrent.
  locks.AbstractQueuedSynchronizer$ConditionObject@6a6e7f8d\n\tat
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)\n\t-  w
  aiting on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6a6e7f8d\n\tat
java.base@11.0.13/java.util.concurrent.
  locks.LockSupport.park(Unknown Source)\n\tat java.base@11.0.13
/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.a
  wait(Unknown Source)\n\tat
java.base@11.0.13/java.util.concurrent.LinkedBlockingDeque.takeFirst(Unknown
Source)\n\tat java.base@11.0.1
  3/java.util.concurrent.LinkedBlockingDeque.take(Unknown Source)\n\tat
app//org.apache.flink.runtime.checkpoint.channel.ChannelStateWri

teRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)\n\tat
app//org.apache.flink.runtime.checkpoint.channel.Channe

lStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)\n\tat
app//org.apache.flink.runtime.checkpoint.channe

l.ChannelStateWriteRequestExecutorImpl$$Lambda$952/0x0000000800a7a840.run(Unknown
Source)\n\t...\n\n"
},
{
  "threadName": "IterationSink-13 (4/12)#0",
  "stringifiedThreadInfo": "\"IterationSink-13 (4/12)#0\" prio=5 Id=264
TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSy
  nchronizer$ConditionObject@4fff6178\n\tat
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native
Method)\n\t-  waiting on java.util.co
  ncurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4fff6178\n\tat
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park
  Nanos(Unknown Source)\n\tat
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
Sour
  ce)\n\tat
app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)\n\tat
app//org.apache.f

link.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)\n\tat
app//o

rg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)\n\tat
app//org.apache.flink.st

reaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)\n\tat
app//org.apache.flink.streaming.runtime
  .tasks.StreamTask.runMailboxLoop(StreamTask.java:809)\n\t...\n\n"
}

Reply via email to