[jira] [Created] (FLINK-18263) Allow external checkpoints to be persisted even when the job is in "Finished" state.

2020-06-11 Thread Mark Cho (Jira)
Mark Cho created FLINK-18263:


 Summary: Allow external checkpoints to be persisted even when the 
job is in "Finished" state.
 Key: FLINK-18263
 URL: https://issues.apache.org/jira/browse/FLINK-18263
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Mark Cho


Currently, `execution.checkpointing.externalized-checkpoint-retention` 
configuration supports two options:
- `DELETE_ON_CANCELLATION` which keeps the externalized checkpoints in FAILED 
and SUSPENDED state.
- `RETAIN_ON_CANCELLATION` which keeps the externalized checkpoints in FAILED, 
SUSPENDED, and CANCELED state.

This gives us control over the retention of externalized checkpoints in all 
terminal state of a job, except for the FINISHED state.
If the job ends up in "FINISHED" state, externalized checkpoints will be 
automatically cleaned up and there currently is no config that will ensure that 
these externalized checkpoints to be persisted.

I found an old Jira ticket FLINK-4512 where this was discussed. I think it 
would be helpful to have a config that can control the retention policy for 
FINISHED state as well.
- This can be useful for cases where we want to rewind a job (that reached the 
FINISHED state) to a previous checkpoint.
- When we use externalized checkpoints, we want to fully delegate the 
checkpoint clean-up to an external process in all job states (without 
cherrypicking FINISHED state to be cleaned up by Flink).

We have a quick fix working in our fork where we've changed 
`ExternalizedCheckpointCleanup` enum:
{code:java}
RETAIN_ON_FAILURE (renamed from DELETE_ON_CANCELLATION; retains on FAILED)
RETAIN_ON_CANCELLATION (kept the same; retains on FAILED, CANCELED)
RETAIN_ON_SUCCESS (added; retains on FAILED, CANCELED, FINISHED)
{code}
Since this change requires changes to multiple components (e.g. config values, 
REST API, Web UI, etc), I wanted to get the community's thoughts before I 
invest more time in my quick fix PR (which currently only contains minimal 
change to get this working).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-11957) Expose failure cause in the API response when dispatcher fails to submit a job

2019-03-18 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11957:


 Summary: Expose failure cause in the API response when dispatcher 
fails to submit a job
 Key: FLINK-11957
 URL: https://issues.apache.org/jira/browse/FLINK-11957
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.7.2
Reporter: Mark Cho


We use POST /jars/:jarid/run API endpoint to submit a Flink job

https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jars-jarid-run

 

Currently, whenever there is an error, API response only returns the following 
info:
{code:java}
{
  "errors": [
"org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
job."
  ]
}
{code}
Since job submission can fail for multiple reasons, it would be helpful to have 
some information that tells us why the job submission failed. Currently, we 
have to dig into the Flink logs to find the root cause.

 

Some examples of job submission failure can be:
{code:java}
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory 
's3://us-east-1.spaas.test/checkpoints/metadata/spaas_app_mcho-flink_bp_test/cee4-155266396689/fa82a7d2c8dfb6f7fb14bf2e319d4367/chk-969/_metadata'
 on file system 's3'.
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:241)
at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:109)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1100)
at 
org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1241)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1165)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
... 10 more
{code}
{code:java}
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
 at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
 ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
Incompatible failover strategy - strategy 'Individual Task Restart' can only 
handle jobs with only disconnected tasks. at 
org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy.notifyNewVertices

[jira] [Created] (FLINK-11196) Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files

2018-12-18 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11196:


 Summary: Extend S3 EntropyInjector to use key replacement (instead 
of key removal) when creating checkpoint metadata files
 Key: FLINK-11196
 URL: https://issues.apache.org/jira/browse/FLINK-11196
 Project: Flink
  Issue Type: Improvement
  Components: FileSystem
Affects Versions: 1.7.0
Reporter: Mark Cho


We currently use S3 entropy injection when writing out checkpoint data.

We also use external checkpoints so that we can resume from a checkpoint 
metadata file later.

The current implementation of S3 entropy injector makes it difficult to locate 
the checkpoint metadata files since in the newer versions of Flink, 
`state.checkpoints.dir` configuration controls where the metadata and state 
files are written, instead of having two separate paths (one for metadata, one 
for state files).

With entropy injection, we replace the entropy marker in the path specified by 
`state.checkpoints.dir` with entropy (for state files) or we strip out the 
marker (for metadata files).

 

We need to extend the entropy injection so that we can replace the entropy 
marker with a predictable path (instead of removing it) so that we can do a 
prefix query for just the metadata files.

By not using the entropy key replacement (defaults to empty string), you get 
the same behavior as it is today (entropy marker removed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11195) Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration

2018-12-18 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11195:


 Summary: Extend AbstractS3FileSystemFactory.createHadoopFileSystem 
to accept URI and Hadoop Configuration
 Key: FLINK-11195
 URL: https://issues.apache.org/jira/browse/FLINK-11195
 Project: Flink
  Issue Type: Improvement
  Components: FileSystem
Affects Versions: 1.7.0
Reporter: Mark Cho


Currently, `createHadoopFileSystem` method does not take any parameters.

In order to delegate FileSystem creation to Hadoop FileSystem.get method, we 
need to pass URI and Hadoop Configuration to this abstract method.

We use a custom version of PrestoS3FileSystem by plugging our FileSystemFactory 
similar to `flink-filesystems/flink-s3-fs-presto` project. However, we would 
like to delegate our FS creation to Hadoop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs

2018-12-11 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11134:


 Summary: Invalid REST API request should not log the full 
exception in Flink logs
 Key: FLINK-11134
 URL: https://issues.apache.org/jira/browse/FLINK-11134
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0
Reporter: Mark Cho


{code:java}
2018-12-11 17:52:19,207 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception 
occurred in REST handler.
org.apache.flink.runtime.rest.NotFoundException: Job 
15d06690e88d309aa1bdbb6ce7c6dcd1 not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
not find Flink job (15d06690e88d309aa1bdbb6ce7c6dcd1)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.sca

[jira] [Created] (FLINK-11133) FsCheckpointStorage is unaware about S3 entropy when creating directories

2018-12-11 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11133:


 Summary: FsCheckpointStorage is unaware about S3 entropy when 
creating directories
 Key: FLINK-11133
 URL: https://issues.apache.org/jira/browse/FLINK-11133
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0
Reporter: Mark Cho


We currently use S3 for our checkpoint storage with S3 entropy enabled.

Entropy seems to be working correctly when writing out checkpoint metadata file 
(entropy key is correctly stripped from `state.checkpoints.dir`) and when 
writing out checkpoint data file (entropy key is correctly replaced with random 
string).

However, from the logs, it seems like entropy key is not stripped or replaced 
when `FsCheckpointStorage` creates directories in the following class:

[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java#L83-L85]

Should FsCheckpointStorage skip initializing mkdir calls if object store like 
S3 is used since S3 doesn't have directory concept?

If we want to keep the `mkdir` calls in `FsCheckpointStorage`, we should handle 
the entropy key specified in `state.checkpoints.dir`. Currently, folder markers 
in S3 are being created by Hadoop FileSystem with the entropy key in the path 
as a result of `mkdir` calls in `FsCheckpointStorage`).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10907) Job recovery on the same JobManager causes JobManager metrics to report stale values

2018-11-15 Thread Mark Cho (JIRA)
Mark Cho created FLINK-10907:


 Summary: Job recovery on the same JobManager causes JobManager 
metrics to report stale values
 Key: FLINK-10907
 URL: https://issues.apache.org/jira/browse/FLINK-10907
 Project: Flink
  Issue Type: Bug
  Components: Core, Metrics
Affects Versions: 1.4.2
 Environment: Verified the bug and the fix running on Flink 1.4

Based on the JobManagerMetricGroup.java code in master, this issue should still 
occur on Flink versions after 1.4.
Reporter: Mark Cho


* JobManager loses and regains leadership if it loses connection and reconnects 
to ZooKeeper.
 * When it regains the leadership, it tries to recover the job graph.
 * During the recovery, it will try to reuse the existing 
{{JobManagerMetricGroup}} to register new counters and gauges under the same 
metric name, which causes the new counters and gauges to be registered 
incorrectly.
 * The old counters and gauges will continue to
report the stale values and the new counters and gauges will not report
the latest metric.

Relevant lines from logs
{code:java}
com.---.JobManager - Submitting recovered job e9e49fd9b8c61cf54b435f39aa49923f.
com.---.JobManager - Submitting job e9e49fd9b8c61cf54b435f39aa49923f 
(flink-job) (Recovery).
com.---.JobManager - Running initialization on master for job flink-job 
(e9e49fd9b8c61cf54b435f39aa49923f).
com.---.JobManager - Successfully ran initialization on master in 0 ms.
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'totalNumberOfCheckpoints'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be 
reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be 
reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'numberOfFailedCheckpoints'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be 
reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'lastCheckpointSize'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'lastCheckpointDuration'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be 
reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'lastCheckpointExternalPath'. Metric will not be 
reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'restartingTime'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'downtime'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'uptime'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'fullRestarts'. Metric will not be reported.[]
org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a 
Metric with the name 'task_failures'. Metric will not be reported.[]
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)