[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1153 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-146948889 I made some more fixes for the shading of the curator dependency. Once Travis gives green light, I'll merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-146810519 Rebased on the current master and incorporated the job manager state modification fix. Thanks for that! Can we merge this after Travis gives the green light? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-146639217 Till found another issue in one of is Travis runs, which has been addressed in e54a86c. This is now rebased on the current master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41536137 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath);
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-146610846 I've addressed your points. Thanks for taking the time to review again. I had to revert the check point ID counter removal. Sorry for the confusion. If we don't find anything major, I vote to merge this and file follow up issues (for example for the terminal failure you mentioned). What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41536249 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath);
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41536142 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath);
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41535719 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}. + */ +class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** The completed checkpoints. */ + private final ArrayDeque checkpoints; + + /** +* Creates {@link StandaloneCompletedCheckpointStore}. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. +* @param userClassLoaderThe user class loader used to discard checkpoints +*/ + public StandaloneCompletedCheckpointStore( + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) { + + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + + this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; + this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); + + this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + } + + @Override + public void recover() throws Exception { + // Nothing to do + } + + @Override + public int getNextCheckpointID() { --- End diff -- Removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41535724 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.List; + +/** + * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. + */ +public interface CompletedCheckpointStore { + + /** +* Recover available {@link CompletedCheckpoint} instances. +* +* After a call to this method, {@link #getLatestCheckpoint()} returns the latest +* available checkpoint. +*/ + void recover() throws Exception; + + /** +* Returns the next checkpoint ID to use. +* +* @return The next checkpoint ID. +*/ + int getNextCheckpointID() throws Exception; --- End diff -- Removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41535673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +* @para
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41535485 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/') +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41532972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/')
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41532867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/')
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41531514 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41531592 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41531181 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsP
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41531112 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsP
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41528652 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +* @para
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41528000 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41527930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +* @para
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41527783 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41527578 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +* @para
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41527148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.List; + +/** + * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. + */ +public interface CompletedCheckpointStore { + + /** +* Recover available {@link CompletedCheckpoint} instances. +* +* After a call to this method, {@link #getLatestCheckpoint()} returns the latest +* available checkpoint. +*/ + void recover() throws Exception; + + /** +* Returns the next checkpoint ID to use. +* +* @return The next checkpoint ID. +*/ + int getNextCheckpointID() throws Exception; --- End diff -- Maybe add that this is the starting checkpoint ID value for the `CheckpointCoordinator` which maintains its own counter which it also increments. Thus, this method is only called once when the `CheckpointCoordinator` is initiated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobmanager.RecoveryMode; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}. + */ +class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** The completed checkpoints. */ + private final ArrayDeque checkpoints; + + /** +* Creates {@link StandaloneCompletedCheckpointStore}. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. +* @param userClassLoaderThe user class loader used to discard checkpoints +*/ + public StandaloneCompletedCheckpointStore( + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) { + + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + + this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; + this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); + + this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + } + + @Override + public void recover() throws Exception { + // Nothing to do + } + + @Override + public int getNextCheckpointID() { --- End diff -- Maybe add a comment, that this is actually the starting checkpoint ID for the `CheckpointCoordinator` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526810 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526594 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/') +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526411 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Checkpoints are added under a ZNode per job: + * + * +O /flink/checkpoints/[persistent] + * .| + * .+O /flink/checkpoints/ /1 [persistent_sequential] + * .. . + * .. . + * .. . + * .+O /flink/checkpoints/ /N [persistent_sequential] + * + * + * During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque> checkpointStateHandles; + + /** +* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. +* +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at +* least 1). Adding more checkpoints than this results +* in older checkpoints being discarded. On recovery, +* we will only start with a single checkpoint. +* @param userClassLoaderThe user class loader used to discard checkpoints +* @param client The Curator ZooKeeper client +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41523825 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/')
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41523760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +*start with a '/')
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-146297139 I've addressed your inline comments (see commits). Not rebased yet. I think it's easier to review this way. The major points: - Followed all renaming suggestions. - Removed the checkpoint ID counter and replaced with . - Left the submission to fail in case of concurrent modifications to have clear error messages. The behaviour in such cases is rather undefined at the moment. - Your description about the terminally failing behaviour is correct. The behaviour was on purpose for the initial version. The root zNode essentially identifies the cluster at the moment. We can fix this is to use a per cluster random root zNode, e.g. `/flink/random/...` instead of `/flink/...` What do you think? In any case, I would do this as a quick follow up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428998 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428959 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428971 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428739 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41428442 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41424050 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41422825 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/ N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41414813 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41414820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { + + private final StateHandle stateHandle; + + private final String pathInZooKeeper; + + public ZooKeeperStateHandle(StateHandle stateHandle, String pathInZooKeeper) { + this.stateHandle = checkNotNull(stateHandle, "State handle"); + this.pathInZooKeeper = checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + } + + public StateHandle getStateHandle() { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41414806 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41413616 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoints.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.List; + +/** + * A bounded LIFO-queue of {@link SuccessfulCheckpoint} instances. + */ +public interface CompletedCheckpoints { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41413324 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphs.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for recovery. + */ +public interface SubmittedJobGraphs { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41411532 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -19,29 +19,28 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; import java.util.List; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) * and that is considered completed. */ -public class SuccessfulCheckpoint { - - private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class); - +public class SuccessfulCheckpoint implements Serializable { --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41411405 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41410527 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41409570 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41409329 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41409302 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41409272 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41408822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -153,6 +176,143 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService( } /** +* Creates a {@link ZooKeeperSubmittedJobGraphs} instance. +* +* @param clientThe {@link CuratorFramework} ZooKeeper client to use +* @param configuration {@link Configuration} object +* @return {@link ZooKeeperSubmittedJobGraphs} instance +*/ + public static ZooKeeperSubmittedJobGraphs createSubmittedJobGraphs( + CuratorFramework client, + Configuration configuration) throws Exception { + + checkNotNull(configuration, "Configuration"); + + // State backend + String stateBackend = configuration.getString( + ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format( + "You currently have to configure the filesystem state backend in order for " + + "ZooKeeper recovery to work. Flink will use this backend to store " + + "meta data to recover jobs. Please set '%s' to 'FILESYSTEM' in the " + + "configuration and set the recovery path via key '%s'.", + ConfigConstants.STATE_BACKEND, + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } + + // State backend root dir + String stateBackendPath = configuration.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, + ""); + + if (stateBackendPath.equals("")) { + throw new IllegalConfigurationException(String.format( + "You have to specify a path for the file system state backend for storing " + + "recovery information. Please set the configuration key '%s'.", + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } + + // ZooKeeper submitted jobs root dir + String zooKeeperSubmittedJobsPath = configuration.getString( + ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH, + ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + + StateHandleProvider stateHandleProvider = FileStateHandle + .createProvider(stateBackendPath); + + return new ZooKeeperSubmittedJobGraphs( + client, zooKeeperSubmittedJobsPath, stateHandleProvider); + } + + /** +* Creates a {@link ZooKeeperCompletedCheckpoints} instance. +* +* @param client The {@link CuratorFramework} ZooKeeper client to use +* @param configuration {@link Configuration} object +* @param jobId ID of job to create the instance for +* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain +* @param userClassLoaderUser code class loader +* @return {@link ZooKeeperCompletedCheckpoints} instance +*/ + public static CompletedCheckpoints createCompletedCheckpoints( + CuratorFramework client, + Configuration configuration, + JobID jobId, + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader) throws Exception { + + checkNotNull(configuration, "Configuration"); + + // State backend + String stateBackend = configuration.getString( + ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format( + "You currently have to configure the filesystem state backend in order for " + + "ZooKeeper recovery to work. Flink will use this backend to store " + +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41408810 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -153,6 +176,143 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService( } /** +* Creates a {@link ZooKeeperSubmittedJobGraphs} instance. +* +* @param clientThe {@link CuratorFramework} ZooKeeper client to use +* @param configuration {@link Configuration} object +* @return {@link ZooKeeperSubmittedJobGraphs} instance +*/ + public static ZooKeeperSubmittedJobGraphs createSubmittedJobGraphs( + CuratorFramework client, + Configuration configuration) throws Exception { + + checkNotNull(configuration, "Configuration"); + + // State backend + String stateBackend = configuration.getString( + ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format( + "You currently have to configure the filesystem state backend in order for " + + "ZooKeeper recovery to work. Flink will use this backend to store " + + "meta data to recover jobs. Please set '%s' to 'FILESYSTEM' in the " + + "configuration and set the recovery path via key '%s'.", + ConfigConstants.STATE_BACKEND, + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } + + // State backend root dir + String stateBackendPath = configuration.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, + ""); + + if (stateBackendPath.equals("")) { + throw new IllegalConfigurationException(String.format( + "You have to specify a path for the file system state backend for storing " + + "recovery information. Please set the configuration key '%s'.", + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41408785 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -153,6 +176,143 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService( } /** +* Creates a {@link ZooKeeperSubmittedJobGraphs} instance. +* +* @param clientThe {@link CuratorFramework} ZooKeeper client to use +* @param configuration {@link Configuration} object +* @return {@link ZooKeeperSubmittedJobGraphs} instance +*/ + public static ZooKeeperSubmittedJobGraphs createSubmittedJobGraphs( + CuratorFramework client, + Configuration configuration) throws Exception { + + checkNotNull(configuration, "Configuration"); + + // State backend + String stateBackend = configuration.getString( + ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { --- End diff -- Resolved (with the least effort as it is going to be superseded by some upcoming changes to the state backend) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41402553 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java --- @@ -67,4 +67,20 @@ * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ boolean hasLeadership(); + + /** +* [EXPERIMENTAL] Returns true if the {@link LeaderContender} is leader. A call +* to this method might block. +* +* This forces a synchronous check at the respective state backend. It is possible +* that is does not reflect the current state at the {@link LeaderContender}, which is notified +* asynchronously. Therefore it is possible that {@link #hasLeadership()} and {@link +* #syncHasLeadership()} have different return values. +* +* @TODO @tillrohrmann Is it OK to collapse this with {@link #hasLeadership()}? +* +* @return true if the associated {@link LeaderContender} is the leader, otherwise false +*/ + boolean syncHasLeadership(); --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41402299 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41401940 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41399491 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + throw new IllegalStateException("StandaloneSubmittedJobGraphs cannot recover job graphs. " + + "How did you end up here?"); + } + + @Override + public List recoverJobGraphs() throws Exception { + return Collections.emptyList(); --- End diff -- Resolved (returning an Optional) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41397436 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/ N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Path of the shared count */ + private final String counterPath; + + /** Curator recipe for shared counts */ + private final SharedCount sharedCount; + + /** Connection state listener to monitor the client connection */ + private final SharedCountConnectionStateListener connStateListener = + new SharedCountConnectionStateListener(); + + /** +* Creates a {@link ZooKeeperCheckpointIDCounter} instance. +* +* @param client Curator ZooKeeper client +* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. +* @throws Exception +*/ + public ZooKeeperCheckpointIDCounter(CuratorFramework client, String counterPath) throws Exception { + this.client = checkNotNull(client, "Curator client"); + this.counterPath = checkNotNull(counterPath, "Counter path"); + this.sharedCount = new SharedCount(client, counterPath, 1); + } + + @Override + public void start() throws Exception { + sharedCount.start(); + client.getConnectionStateListenable().addListener(connStateListener); + } + + @Override + public void stop() throws Exception { + sharedCount.close(); + client.getConnectionStateListenable().removeListener(connStateListener); + + LOG.info("Removing {} from ZooKeeper", counterPath); + client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); + } + + @Override + public long getAndIncrement() throws Exception { + while (true) { + ConnectionState connState = connStateListener.getLastState(); + + if (connState != null) { + throw new IllegalStateException("Connection state: " + connState); + } + + VersionedValue current = sharedCount.getVersionedValue(); + + Integer newCount = current.getValue() + 1; + + if (sharedCount.trySetCount(current, newCount)) { + return current.getValue(); + } + } + } + +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41397266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java --- @@ -48,9 +55,9 @@ private final int subtask; public StateForTask(SerializedValue> state, JobVertexID operatorId, int subtask) { - if (state == null || operatorId == null || subtask < 0) { - throw new IllegalArgumentException(); - } + checkNotNull(state, "State"); + checkNotNull(operatorId, "Operator ID"); --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41397182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -178,9 +195,9 @@ public void run() { * After this method has been called, the coordinator does not accept and further * messages and cannot trigger any further checkpoints. */ - public void shutdown() { + public void shutdown() throws Exception { synchronized (lock) { - try { + try { if (shutdown) { return; } --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41397076 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -110,61 +120,68 @@ public CheckpointCoordinator( JobID job, - int numSuccessfulCheckpointsToRetain, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader) { + ClassLoader userClassLoader, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpoints completedCheckpoints, + RecoveryMode recoveryMode) throws Exception { // some sanity checks if (job == null || tasksToTrigger == null || tasksToWaitFor == null || tasksToCommitTo == null) { throw new NullPointerException(); } - if (numSuccessfulCheckpointsToRetain < 1) { - throw new IllegalArgumentException("Must retain at least one successful checkpoint"); - } if (checkpointTimeout < 1) { throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); } this.job = job; --- End diff -- Resolved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-145798045 What happens if the last JM dies and with it the currently executed job fails permanently. The in ZooKeeper stored `JobGraph` will then be recovered when a new Flink cluster is started, right? Does this make sense? Is there a way to get rid of terminally failed jobs? The problem is that otherwise the recovered job won't find the submitting `JobClient` and occupies cluster resources (slots). Thus, you start a new cluster and want to submit a job and it fails because an old recovered job occupies the slots. But maybe I overlooked the mechanism to avoid this scenario. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41158214 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/ N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { --- End diff -- As talked offline: It's seems to be ok that concurrently running jobs have overlapping checkpoint IDs. Thus, using ZooKeeper's provided sequential ID to distinguish checkpoint nodes unambiguously and using a checkpoint ID counter on a JobManager basis should be ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-145557954 Thanks for taking the time to review. Thanks for the great renaming suggestions and all the other more subtle remarks. I agree with virtually all comments. The only thing, which needs some clarification is regarding the checkpoint ID counter. What's your take on my inline comment? To recap: I'm not sure whether the second point really makes it easier, because we would have to add the retry logic to all the operations manually. Currently, Curator takes care of this for us. Do you think it's a good idea to merge it into the CompletedCheckpointStore? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152047 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152813 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152515 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java --- @@ -19,29 +19,28 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; import java.util.List; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) * and that is considered completed. */ -public class SuccessfulCheckpoint { - - private static final Logger LOG = LoggerFactory.getLogger(SuccessfulCheckpoint.class); - +public class SuccessfulCheckpoint implements Serializable { --- End diff -- OK. I thought about this as well, but kept it to refrain from unnecessary changes. But if you feel the same way, there is probably a good reason to change it. ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152586 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152418 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1153#issuecomment-145554976 Great piece of work @uce :-) I've gone through the code and added some comments. The main points are * the exception handling in `ZooKeeperSubmittedJobGraphs` which can fail the job submission if two JMs try to write `JobGraphs` with the same `JobID` * and the usage of a shared counter which could maybe replaced by creating the `SuccessfulCheckpoint` nodes in `PERSISTENT_SEQUENTIAL` mode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152368 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152286 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java --- @@ -67,4 +67,20 @@ * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ boolean hasLeadership(); + + /** +* [EXPERIMENTAL] Returns true if the {@link LeaderContender} is leader. A call +* to this method might block. +* +* This forces a synchronous check at the respective state backend. It is possible +* that is does not reflect the current state at the {@link LeaderContender}, which is notified +* asynchronously. Therefore it is possible that {@link #hasLeadership()} and {@link +* #syncHasLeadership()} have different return values. +* +* @TODO @tillrohrmann Is it OK to collapse this with {@link #hasLeadership()}? +* +* @return true if the associated {@link LeaderContender} is the leader, otherwise false +*/ + boolean syncHasLeadership(); --- End diff -- Yes, let's stick to `hasLeadership`. I didn't remove it, because I was undecided about this point as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152317 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152243 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152128 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); --- End diff -- see other comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152116 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java --- @@ -48,9 +55,9 @@ private final int subtask; public StateForTask(SerializedValue> state, JobVertexID operatorId, int subtask) { - if (state == null || operatorId == null || subtask < 0) { - throw new IllegalArgumentException(); - } + checkNotNull(state, "State"); + checkNotNull(operatorId, "Operator ID"); --- End diff -- see other comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152092 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + throw new IllegalStateException("StandaloneSubmittedJobGraphs cannot recover job graphs. " + + "How did you end up here?"); + } + + @Override + public List recoverJobGraphs() throws Exception { + return Collections.emptyList(); --- End diff -- For sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152041 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -110,61 +120,68 @@ public CheckpointCoordinator( JobID job, - int numSuccessfulCheckpointsToRetain, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, - ClassLoader userClassLoader) { + ClassLoader userClassLoader, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpoints completedCheckpoints, + RecoveryMode recoveryMode) throws Exception { // some sanity checks if (job == null || tasksToTrigger == null || tasksToWaitFor == null || tasksToCommitTo == null) { throw new NullPointerException(); } - if (numSuccessfulCheckpointsToRetain < 1) { - throw new IllegalArgumentException("Must retain at least one successful checkpoint"); - } if (checkpointTimeout < 1) { throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); } this.job = job; --- End diff -- Yes, I've changed it in some places and not in others --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152054 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -178,9 +195,9 @@ public void run() { * After this method has been called, the coordinator does not accept and further * messages and cannot trigger any further checkpoints. */ - public void shutdown() { + public void shutdown() throws Exception { synchronized (lock) { - try { + try { if (shutdown) { return; } --- End diff -- OK --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41151947 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and +
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41151770 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphs.java --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for recovery. + */ +public interface SubmittedJobGraphs { --- End diff -- +1 (also to the other proposed renamings) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41151872 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { + + private final StateHandle stateHandle; + + private final String pathInZooKeeper; + + public ZooKeeperStateHandle(StateHandle stateHandle, String pathInZooKeeper) { + this.stateHandle = checkNotNull(stateHandle, "State handle"); + this.pathInZooKeeper = checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + } + + public StateHandle getStateHandle() { --- End diff -- +1 to all the ZooKeeperStateHandle suggestions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41151337 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41150514 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java --- @@ -67,4 +67,20 @@ * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ boolean hasLeadership(); + + /** +* [EXPERIMENTAL] Returns true if the {@link LeaderContender} is leader. A call +* to this method might block. +* +* This forces a synchronous check at the respective state backend. It is possible +* that is does not reflect the current state at the {@link LeaderContender}, which is notified +* asynchronously. Therefore it is possible that {@link #hasLeadership()} and {@link +* #syncHasLeadership()} have different return values. +* +* @TODO @tillrohrmann Is it OK to collapse this with {@link #hasLeadership()}? +* +* @return true if the associated {@link LeaderContender} is the leader, otherwise false +*/ + boolean syncHasLeadership(); --- End diff -- Hmm, but the Javadoc suggests a wrong guarantee which does not always hold true. The thing is that we don't really know what are the chances to observe a wrong leadership and to what extent this method decreases them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41149990 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/ N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { --- End diff -- I think that won't work. It can happen that two concurrent checkpoints return the same checkpoint ID, because creating a new node and getting the cversion after that operation is not atomic. We can do it with `persistent_sequential` nodes, but this will imo add complexity than reduce it. These are corner cases of course, but the counter makes the semantics clearer imo. We could merge the counter into the `CompletedCheckpointsStore`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41149926 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.util.InstantiationUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * State handles backed by ZooKeeper. + * + * Added state is persisted via {@link StateHandle}s, which in turn are written to + * ZooKeeper. This level of indirection is necessary to keep the amount of data in ZooKeeper + * small. ZooKeeper is build for data in the KB range whereas state can grow to multiple MBs. + * + * State modifications require some care, because it is possible that certain failures bring + * the state handle backend and ZooKeeper out of sync. + * + * ZooKeeper holds the ground truth about state handles, i.e. the following holds: + * + * + * State handle in ZooKeeper => State handle exists + * + * + * But not: + * + * + * State handle exists => State handle in ZooKeeper + * + * + * There can be lingering state handles when failures happen during operation. They + * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513";> + * FLINK-2513 about a possible way to overcome this). + * + * @param Type of state + */ +public class ZooKeeperStateHandleStore { + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** State handle provider */ + private final StateHandleProvider stateHandleProvider; + + /** +* Creates a {@link ZooKeeperStateHandleStore}. +* +* @param client The Curator ZooKeeper client. Important: It is +*expected that the client's namespace ensures that the root +*path is exclusive for all state handles managed by this +*instance, e.g. client.usingNamespace("/stateHandles") +* @param stateHandleProvider The state handle provider for the state +*/ + public ZooKeeperStateHandleStore( + CuratorFramework client, + StateHandleProvider stateHandleProvider) { + + this.client = checkNotNull(client, "Curator client"); + this.stateHandleProvider = checkNotNull(stateHandleProvider, "State handle provider"); + } + + /** +* Creates a state handle and stores it in ZooKeeper with create mode {@link +* CreateMode#PERSISTENT}. +* +* @see #add(String, Serializable, CreateMode) +*/ + public ZooKeeperStateHandle add(String pathInZooKeeper, T state) throws Exception { + return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + } + + /** +* Creates a state handle and stores it in ZooKeeper. +* +* Important: This will not store the actual state in +* ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection +* makes sure that data in ZooKeeper is small. +* +* @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41149494 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath)
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41149560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java --- @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each job graph creates ZNode: + * + * +O /flink/jobgraphs/1 [persistent] + * . + * . + * . + * +O /flink/jobgraphs/ N [persistent] + * + * + * The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore jobGraphsInZooKeeper; + + /** +* Cache to monitor all children. This is used to detect races with other instances working +* on the same state. +*/ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphs( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider stateHandleProvider) throws Exception { + + checkNotNull(client, "Curator client"); + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = client; + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath)
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41147873 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * Each counter creates a ZNode: + * + * +O /flink/checkpoint-counter/1 [persistent] + * . + * . + * . + * +O /flink/checkpoint-counter/ N [persistent] + * + * + * The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case + * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. + */ +public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { --- End diff -- I'm not so sure whether we really need such a shared counter. If I've understood it correctly, then the completed checkpoints use the counter value as their node names, right? We have this counter because we need an ascending ID to distinguish old from new checkpoints. But isn't this exactly, what ZooKeeper's `PERSISTENT_SEQUENTIAL` mode would offer us for free? It will simply extract the current `cversion` of the parent, increment it and append it to the given node name. I think that we could save some complexity here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41146895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { --- End diff -- I think that this class should not be a public top-level class. This class only makes sense in conjunction with the `ZooKeeperStateHandleStore`. Therefore, I would make this class an inner class of the `ZooKeeperStateHandleStore` and make its constructor `private` so that only `ZooKeeperStateHandleStore` can create instances of it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41145818 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoints.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import java.util.List; + +/** + * A bounded LIFO-queue of {@link SuccessfulCheckpoint} instances. + */ +public interface CompletedCheckpoints { --- End diff -- I think that `CompletedCheckpoints` is not a good name, because it does not tell you what this class really does. Something like `CompletedCheckpointStore` would be better imho. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41146553 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { + + private final StateHandle stateHandle; + + private final String pathInZooKeeper; + + public ZooKeeperStateHandle(StateHandle stateHandle, String pathInZooKeeper) { + this.stateHandle = checkNotNull(stateHandle, "State handle"); + this.pathInZooKeeper = checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); + } + + public StateHandle getStateHandle() { --- End diff -- This is confusing because it creates something like: `zookeeperStateHandle.getStateHandle().getState(...)`. Why calling `getStateHandle` on a `StateHandle`? The name indicates that `ZooKeeperStateHandle` is already a `StateHandle`. Why not simply offer a `getState(ClassLoader loader)` which forwards the call to the wrapped `StateHandle`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41146216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandle.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.flink.runtime.state.StateHandle; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link StateHandle} with its path in ZooKeeper. + * + * @param Type of state + */ +public class ZooKeeperStateHandle { --- End diff -- I think this class should be renamed, because we already have a `StateHandle` class. The name indicates for me that `ZooKeeperStateHandle` is a subclass of `StateHandle` which it is not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---