[jira] [Created] (FLINK-18263) Allow external checkpoints to be persisted even when the job is in "Finished" state.
Mark Cho created FLINK-18263: Summary: Allow external checkpoints to be persisted even when the job is in "Finished" state. Key: FLINK-18263 URL: https://issues.apache.org/jira/browse/FLINK-18263 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Mark Cho Currently, `execution.checkpointing.externalized-checkpoint-retention` configuration supports two options: - `DELETE_ON_CANCELLATION` which keeps the externalized checkpoints in FAILED and SUSPENDED state. - `RETAIN_ON_CANCELLATION` which keeps the externalized checkpoints in FAILED, SUSPENDED, and CANCELED state. This gives us control over the retention of externalized checkpoints in all terminal state of a job, except for the FINISHED state. If the job ends up in "FINISHED" state, externalized checkpoints will be automatically cleaned up and there currently is no config that will ensure that these externalized checkpoints to be persisted. I found an old Jira ticket FLINK-4512 where this was discussed. I think it would be helpful to have a config that can control the retention policy for FINISHED state as well. - This can be useful for cases where we want to rewind a job (that reached the FINISHED state) to a previous checkpoint. - When we use externalized checkpoints, we want to fully delegate the checkpoint clean-up to an external process in all job states (without cherrypicking FINISHED state to be cleaned up by Flink). We have a quick fix working in our fork where we've changed `ExternalizedCheckpointCleanup` enum: {code:java} RETAIN_ON_FAILURE (renamed from DELETE_ON_CANCELLATION; retains on FAILED) RETAIN_ON_CANCELLATION (kept the same; retains on FAILED, CANCELED) RETAIN_ON_SUCCESS (added; retains on FAILED, CANCELED, FINISHED) {code} Since this change requires changes to multiple components (e.g. config values, REST API, Web UI, etc), I wanted to get the community's thoughts before I invest more time in my quick fix PR (which currently only contains minimal change to get this working). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-11957) Expose failure cause in the API response when dispatcher fails to submit a job
Mark Cho created FLINK-11957: Summary: Expose failure cause in the API response when dispatcher fails to submit a job Key: FLINK-11957 URL: https://issues.apache.org/jira/browse/FLINK-11957 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.7.2 Reporter: Mark Cho We use POST /jars/:jarid/run API endpoint to submit a Flink job https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jars-jarid-run Currently, whenever there is an error, API response only returns the following info: {code:java} { "errors": [ "org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job." ] } {code} Since job submission can fail for multiple reasons, it would be helpful to have some information that tells us why the job submission failed. Currently, we have to dig into the Flink logs to find the root cause. Some examples of job submission failure can be: {code:java} java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory 's3://us-east-1.spaas.test/checkpoints/metadata/spaas_app_mcho-flink_bp_test/cee4-155266396689/fa82a7d2c8dfb6f7fb14bf2e319d4367/chk-969/_metadata' on file system 's3'. at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:241) at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:109) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1100) at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1241) at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1165) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296) at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157) ... 10 more {code} {code:java} java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Incompatible failover strategy - strategy 'Individual Task Restart' can only handle jobs with only disconnected tasks. at org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy.notifyNewVertices
[jira] [Created] (FLINK-11196) Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files
Mark Cho created FLINK-11196: Summary: Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files Key: FLINK-11196 URL: https://issues.apache.org/jira/browse/FLINK-11196 Project: Flink Issue Type: Improvement Components: FileSystem Affects Versions: 1.7.0 Reporter: Mark Cho We currently use S3 entropy injection when writing out checkpoint data. We also use external checkpoints so that we can resume from a checkpoint metadata file later. The current implementation of S3 entropy injector makes it difficult to locate the checkpoint metadata files since in the newer versions of Flink, `state.checkpoints.dir` configuration controls where the metadata and state files are written, instead of having two separate paths (one for metadata, one for state files). With entropy injection, we replace the entropy marker in the path specified by `state.checkpoints.dir` with entropy (for state files) or we strip out the marker (for metadata files). We need to extend the entropy injection so that we can replace the entropy marker with a predictable path (instead of removing it) so that we can do a prefix query for just the metadata files. By not using the entropy key replacement (defaults to empty string), you get the same behavior as it is today (entropy marker removed). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11195) Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration
Mark Cho created FLINK-11195: Summary: Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration Key: FLINK-11195 URL: https://issues.apache.org/jira/browse/FLINK-11195 Project: Flink Issue Type: Improvement Components: FileSystem Affects Versions: 1.7.0 Reporter: Mark Cho Currently, `createHadoopFileSystem` method does not take any parameters. In order to delegate FileSystem creation to Hadoop FileSystem.get method, we need to pass URI and Hadoop Configuration to this abstract method. We use a custom version of PrestoS3FileSystem by plugging our FileSystemFactory similar to `flink-filesystems/flink-s3-fs-presto` project. However, we would like to delegate our FS creation to Hadoop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs
Mark Cho created FLINK-11134: Summary: Invalid REST API request should not log the full exception in Flink logs Key: FLINK-11134 URL: https://issues.apache.org/jira/browse/FLINK-11134 Project: Flink Issue Type: Bug Affects Versions: 1.7.0 Reporter: Mark Cho {code:java} 2018-12-11 17:52:19,207 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception occurred in REST handler. org.apache.flink.runtime.rest.NotFoundException: Job 15d06690e88d309aa1bdbb6ce7c6dcd1 not found at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) at akka.dispatch.OnComplete.internal(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (15d06690e88d309aa1bdbb6ce7c6dcd1) at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766) at org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.sca
[jira] [Created] (FLINK-11133) FsCheckpointStorage is unaware about S3 entropy when creating directories
Mark Cho created FLINK-11133: Summary: FsCheckpointStorage is unaware about S3 entropy when creating directories Key: FLINK-11133 URL: https://issues.apache.org/jira/browse/FLINK-11133 Project: Flink Issue Type: Bug Affects Versions: 1.7.0 Reporter: Mark Cho We currently use S3 for our checkpoint storage with S3 entropy enabled. Entropy seems to be working correctly when writing out checkpoint metadata file (entropy key is correctly stripped from `state.checkpoints.dir`) and when writing out checkpoint data file (entropy key is correctly replaced with random string). However, from the logs, it seems like entropy key is not stripped or replaced when `FsCheckpointStorage` creates directories in the following class: [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java#L83-L85] Should FsCheckpointStorage skip initializing mkdir calls if object store like S3 is used since S3 doesn't have directory concept? If we want to keep the `mkdir` calls in `FsCheckpointStorage`, we should handle the entropy key specified in `state.checkpoints.dir`. Currently, folder markers in S3 are being created by Hadoop FileSystem with the entropy key in the path as a result of `mkdir` calls in `FsCheckpointStorage`). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values
Mark Cho created FLINK-10907: Summary: Job recovery on the same JobManager causes JobManager metrics to report stale values Key: FLINK-10907 URL: https://issues.apache.org/jira/browse/FLINK-10907 Project: Flink Issue Type: Bug Components: Core, Metrics Affects Versions: 1.4.2 Environment: Verified the bug and the fix running on Flink 1.4 Based on the JobManagerMetricGroup.java code in master, this issue should still occur on Flink versions after 1.4. Reporter: Mark Cho * JobManager loses and regains leadership if it loses connection and reconnects to ZooKeeper. * When it regains the leadership, it tries to recover the job graph. * During the recovery, it will try to reuse the existing {{JobManagerMetricGroup}} to register new counters and gauges under the same metric name, which causes the new counters and gauges to be registered incorrectly. * The old counters and gauges will continue to report the stale values and the new counters and gauges will not report the latest metric. Relevant lines from logs {code:java} com.---.JobManager - Submitting recovered job e9e49fd9b8c61cf54b435f39aa49923f. com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f (flink-job) (Recovery). com.---.JobManager - Running initialization on master for job flink-job (e9e49fd9b8c61cf54b435f39aa49923f). com.---.JobManager - Successfully ran initialization on master in 0 ms. org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointDuration'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointExternalPath'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'restartingTime'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'downtime'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'uptime'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'fullRestarts'. Metric will not be reported.[] org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'task_failures'. Metric will not be reported.[] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)