Robert Metzger created FLINK-19688:
--------------------------------------

             Summary: Flink job gets into restart loop caused by 
InterruptedExceptions
                 Key: FLINK-19688
                 URL: https://issues.apache.org/jira/browse/FLINK-19688
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Network, Runtime / Task
    Affects Versions: 1.12.0
            Reporter: Robert Metzger


I have a benchmarking test job, that throws RuntimeExceptions at any operator 
at a configured, random interval. When using low intervals, such as mean 
failure rate = 60 s, the job will get into a state where it frequently fails 
with InterruptedExceptions.

The same job does not have this problem on Flink 1.11.2 (at least not after 
running the job for 15 hours, on 1.12-SN, it happens within a few minutes)
This is the job: 
https://github.com/rmetzger/flip1-bench/blob/master/flip1-bench-jobs/src/main/java/com/ververica/TPCHQuery3.java

This is the exception:
{code}
2020-10-16 16:02:15,653 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - CHAIN GroupReduce (GroupReduce at main(TPCHQuery3.java:199)) -> 
Map (Map at appendMapper(KillerClientMapper.java:38)) (8/8)#1 
(06d656f696bf4ed98831938a7ac2359d_c1c4a56fea0536703d37867c057f0cc8_7_1) 
switched from RUNNING to FAILED.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at main(TPCHQuery3.java:199)) -> Map (Map at 
appendMapper(KillerClientMapper.java:38))' , caused an error: 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
to an exception: Connection for partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:370) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Caused by: org.apache.flink.util.WrappingRuntimeException: 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
to an exception: Connection for partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:253)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1122) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:475) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated 
due to an exception: Connection for partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_222]
        at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:250)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1122) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:475) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 4 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Connection for 
partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at 
org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:247)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 ~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_222]
        at 
org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:363)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:123)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:82) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: Connection for partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:83) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
 Connection for partition 
060d457c4163472f65a4b741993c83f8#0@06d656f696bf4ed98831938a7ac2359d_0bcc9fbf9ac242d5aac540917d980e44_0_1
 not reachable.
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:160)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:305)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:277)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:93)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:91)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:68) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:79) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager '/192.168.2.172:52366' has failed. This might 
indicate that the remote task manager has been lost.
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:85)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:157)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:305)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:277)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:93)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:91)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:68) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:79) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager '/192.168.2.172:52366' has failed. This might 
indicate that the remote task manager has been lost.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_222]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_222]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:157)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:305)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:277)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:93)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:91)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:68) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:79) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager '/192.168.2.172:52366' has failed. This might 
indicate that the remote task manager has been lost.
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:122)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:101)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.lambda$createPartitionRequestClient$1(PartitionRequestClientFactory.java:78)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.concurrent.FutureUtils.completeFromCallable(FutureUtils.java:88)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:78)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:157)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:305)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:277)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:93)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:91)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:68) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:79) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method) ~[?:1.8.0_222]
        at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_222]
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:114)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:101)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.lambda$createPartitionRequestClient$1(PartitionRequestClientFactory.java:78)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.concurrent.FutureUtils.completeFromCallable(FutureUtils.java:88)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:78)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:67)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:157)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:305)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:277)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:93)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:91)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:68) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:79) 
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
{code}

I will attach all logs to this ticket.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to