Youjun Yuan created FLINK-24187:
-----------------------------------
Summary: Could not commit s3 file after JM restart during state
initialization
Key: FLINK-24187
URL: https://issues.apache.org/jira/browse/FLINK-24187
Project: Flink
Issue Type: Bug
Components: FileSystems
Affects Versions: 1.12.1
Reporter: Youjun Yuan
we have a SQL job which consumes from Kafka, and write hive table, data stored
in S3.
One day the zookeeper leader failed over, caused Flink job restart. However the
job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64
to
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371
at
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104)
~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83)
~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101)
~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120)
~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55)
~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.11-1.12.1.jar:1.12.1] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748)
[?:1.8.0_242] Caused by: java.io.IOException:
java.util.concurrent.CancellationException at
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101)
~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more Caused
by: java.util.concurrent.CancellationException at
java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_242] at
java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242] at
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:262)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:249)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:169)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326)
~[emrfs-hadoop-assembly-2.39.0.jar:?] at
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101)
~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)