Vishal Palla created FLINK-33769: ------------------------------------ Summary: ExternalSorter hits "java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted" when using custom serializer Key: FLINK-33769 URL: https://issues.apache.org/jira/browse/FLINK-33769 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.17.2 Reporter: Vishal Palla
The [NormalizedKeySorter library|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java] is used to sort records in-memory. It internally uses a [SimpleCollectingOutputView|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java] instantiated using a fixed chunk of managed memory to store the records. When the SimpleCollectingOutputView runs out of memory segments, it [throws an EOFException|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java#L76] which [should be caught by the sorter in the write method |https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java#L298]and a {{false}} indicating that the sort buffer was full (javadoc). The issue here is that the EOFException thrown by the SimpleCollectingOutputView is first caught by the record serializer which offers no guarantee on passing on the exception as it was caught upwards. In the case of Kryo and Thrift for example, the serializer wraps the caught exception in their own exception classes and throw them upwards which the sorter doesn't catch and the job crashes. Example stacktrace - {{java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:487) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.WrappingRuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:262) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1222) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:259) ... 9 more Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted at org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:256) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:397) at org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75) Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFException: Can't collect further: memorySource depleted at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80) Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: Can't collect further: memorySource depleted at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) at com.esotericsoftware.kryo.io.OutputChunked.flush(OutputChunked.java:45) at com.esotericsoftware.kryo.io.OutputChunked.endChunks(OutputChunked.java:82) at com.twitter.beam.coder.scala.ChillCoder.encode(ChillCoder.scala:101) at com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:40) at com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:22) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:74) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:607) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:598) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:558) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:110) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:297) at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:77) at org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:69) at org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73) Caused by: java.io.EOFException: Can't collect further: memorySource depleted at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:76) at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:139) at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:205) at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:44) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) ... 20 more}} -- This message was sent by Atlassian Jira (v8.20.10#820010)