[jira] [Commented] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837126#comment-16837126 ] Andrey Zagrebin commented on FLINK-12379: - [~haf] if this is a duplicate of FLINK-12381, could you merge it with FLINK-12381 (description, environment..) and resolve this issue as duplicate? > Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint > > > Key: FLINK-12379 > URL: https://issues.apache.org/jira/browse/FLINK-12379 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: GCS + > > {code:java} > 1.8.0 > 1.8 > 2.11{code} > {code:java} > > > > > com.google.cloud.bigdataoss > gcs-connector > hadoop2-1.9.16 > > > org.apache.flink > flink-connector-filesystem_2.11 > ${flink.version} > > > org.apache.flink > flink-hadoop-fs > ${flink.version} > > > > org.apache.flink > flink-shaded-hadoop2 > ${hadoop.version}-${flink.version} > > {code} > > >Reporter: Henrik >Priority: Major > > When running one standalone-job w/ parallelism=1 + one taskmanager, you will > shortly get this crash > {code:java} > 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster > - Error while processing checkpoint acknowledgement message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize > the pending checkpoint 5. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > Caused by: java.nio.file.FileAlreadyExistsException: Object > gs://example_bucket/flink/checkpoints//chk-5/_metadata > already exists. > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) > ... 19 more > 2019-04-30 22:20:03,114 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 6 @ 1556662802928 for job .{code} > My guess at why; concurrent
[jira] [Commented] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835505#comment-16835505 ] Henrik commented on FLINK-12379: Yes, it's the "standalone-job.sh" entrypoint. > Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint > > > Key: FLINK-12379 > URL: https://issues.apache.org/jira/browse/FLINK-12379 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: GCS + > > {code:java} > 1.8.0 > 1.8 > 2.11{code} > {code:java} > > > > > com.google.cloud.bigdataoss > gcs-connector > hadoop2-1.9.16 > > > org.apache.flink > flink-connector-filesystem_2.11 > ${flink.version} > > > org.apache.flink > flink-hadoop-fs > ${flink.version} > > > > org.apache.flink > flink-shaded-hadoop2 > ${hadoop.version}-${flink.version} > > {code} > > >Reporter: Henrik >Priority: Major > > When running one standalone-job w/ parallelism=1 + one taskmanager, you will > shortly get this crash > {code:java} > 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster > - Error while processing checkpoint acknowledgement message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize > the pending checkpoint 5. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > Caused by: java.nio.file.FileAlreadyExistsException: Object > gs://example_bucket/flink/checkpoints//chk-5/_metadata > already exists. > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) > ... 19 more > 2019-04-30 22:20:03,114 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 6 @ 1556662802928 for job .{code} > My guess at why; concurrent checkpoint writers are updating the _metadata > resource concurrently. They should be using optimistic concurrency
[jira] [Commented] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835361#comment-16835361 ] Stephan Ewen commented on FLINK-12379: -- This looks like the same issue as FLINK-12381 Jobs do not overwrite existing jobs' checkpoints and usually fence paths through jobID. Here, the jobID is all zero for some reason. Can you share the setup? > Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint > > > Key: FLINK-12379 > URL: https://issues.apache.org/jira/browse/FLINK-12379 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: GCS + > > {code:java} > 1.8.0 > 1.8 > 2.11{code} > {code:java} > > > > > com.google.cloud.bigdataoss > gcs-connector > hadoop2-1.9.16 > > > org.apache.flink > flink-connector-filesystem_2.11 > ${flink.version} > > > org.apache.flink > flink-hadoop-fs > ${flink.version} > > > > org.apache.flink > flink-shaded-hadoop2 > ${hadoop.version}-${flink.version} > > {code} > > >Reporter: Henrik >Priority: Major > > When running one standalone-job w/ parallelism=1 + one taskmanager, you will > shortly get this crash > {code:java} > 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster > - Error while processing checkpoint acknowledgement message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize > the pending checkpoint 5. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > Caused by: java.nio.file.FileAlreadyExistsException: Object > gs://example_bucket/flink/checkpoints//chk-5/_metadata > already exists. > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) > ... 19 more > 2019-04-30 22:20:03,114 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 6 @ 1556662802928 for job