[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15025372#comment-15025372
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45793192
--- Diff:
storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the key sequence number which implies the number of
updates made to a blob.
+ * The information regarding the keys and the sequence number which
represents the number of updates are
+ * stored within the zookeeper in the following format.
+ * /storm/blobstore/key_name/nimbushostport-sequencenumber
+ * Example:
+ * If there are two nimbodes with nimbus.seeds:leader,non-leader are set,
+ * then the state inside the zookeeper is eventually stored as:
+ * /storm/blobstore/key1/leader:8080-1
+ * /storm/blobstore/key1/non-leader:8080-1
+ * indicates that a new blob with the name key1 has been created on the
leader
+ * nimbus and the non-leader nimbus syncs after a call back is triggered
by attempting
+ * to download the blob and finally updates its state inside the zookeeper.
+ *
+ * A watch is placed on the /storm/blobstore/key1 and the znodes
leader:8080-1 and
+ * non-leader:8080-1 are ephemeral which implies that these nodes exist
only until the
+ * connection between the corresponding nimbus and the zookeeper persist.
If in case the
+ * nimbus crashes the node disappears under /storm/blobstore/key1.
+ *
+ * The sequence number for the keys are handed over based on the following
scenario:
+ * Lets assume there are three nimbodes up and running, one being the
leader and the other
+ * being the non-leader.
+ *
+ * 1. Create is straight forward.
+ * Check whether the znode -> /storm/blobstore/key1 has been created or
not. It implies
+ * the blob has not been created yet. If not created, it creates it and
updates the zookeeper
+ * states under /storm/blobstore/key1 and
/storm/blobstoremaxkeysequencenumber/key1.
+ * The znodes it creates on these nodes are
/storm/blobstore/key1/leader:8080-1,
+ * /storm/blobstore/key1/non-leader:8080-1 and
/storm/blobstoremaxkeysequencenumber/key1/1.
+ * The later holds the global sequence number across all nimbodes more
like a static variable
+ * indicating the true value of number of updates for a blob. This node
helps to maintain sanity in case
+ * leadership changes due to crashing.
+ *
+ * 2. Delete does not require to hand over the sequence number.
+ *
+ * 3. Finally, the update has few scenarios.
+ *
+ * The class implements a TreeSet. The basic idea is if all the nimbodes
have the same
+ * sequence number for the blob, then the number of elements in the set
is 1 which holds
+ * the latest value of sequence number. If the number of elements are
greater than 1 then it
+ * implies that there is sequence mismatch and there is need for syncing
the blobs across
+ * nimbodes.
+ *
+ * The logic for handing over sequence numbers based on the state are
described as follows
+ * Here consider Nimbus-1 alias as N1 and Nimbus-2 alias as N2.
+ * Scenario 1:
+ * Example: Normal create/update scenario
+ * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1
Seq-Num-Nimbus-2 Max-Seq-Num
+ * Create-Key1 alive - Leader alive 1
1
+ * Sync alive - Leader alive 1
1 (callback -> download) 1
+ * Update-Key1 alive - Leader alive 2
1 2
+ * Sync alive - Leader alive 2
2 (callback -> download) 2
+ *
+ * Scenario 2:
+ * Example: Leader nimbus crash followed by leader election, update and
ex-leader restored again
+ * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1
Seq-Num-Nimbus-2 Max-Seq-Num
+ * Create alive - Leader alive 1
1
+ * Sync alive - Leader alive 1
1 (callback -> download) 1
+ * Update alive - Leader alive 2
1 2
+ * Sync alive - Leader alive 2
2 (callback -> download) 2
+ * Update alive - Leader alive 3
2 3
+ * Crash crash - Leader alive 3
2 3
+ * New - Leader crash alive - Leader 3 (Invalid)
2 3
+ * Update crash alive - Leader 3 (Invalid)
4 (max-seq-num + 1) 4
+ * N1-Restored alive alive - Leader 0
4 4
+ * Sync alive alive - Leader 4
4 4
+ *
+ * Scenario 3:
+ * Example: Leader nimbus crash followed by leader election, update and
ex-leader restored again
+ * Operation Nimbus-1:state Nimbus-2:state Seq-Num-Nimbus-1
Seq-Num-Nimbus-2 Max-Seq-Num
+ * Create alive - Leader alive 1
1
+ * Sync alive - Leader alive 1
1 (callback -> download) 1
+ * Update alive - Leader alive 2
1 2
+ * Sync alive - Leader alive 2
2 (callback -> download) 2
+ * Update alive - Leader alive 3
2 3
+ * Crash crash - Leader alive 3
2 3
+ * Elect Leader crash alive - Leader 3 (Invalid)
2 3
+ * N1-Restored alive alive - Leader 3
2 3
+ * Read/Update alive alive - Leader 3
4 (Downloads from N1) 4
+ * Sync alive alive - Leader 4 (callback)
4 4
+ * Here the download is triggered whenever an operation corresponding to
the blob is triggered on the
+ * nimbus like a read or update operation. Here, in the read/update call
it is hard to know which call
+ * is read or update. Hence, by incrementing the sequence number to
max-seq-num + 1 we ensure that the
+ * synchronization happens appropriately and all nimbodes have the same
blob.
+ */
+public class KeySequenceNumber {
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ private final String BLOBSTORE_SUBTREE="/blobstore";
+ private final String
BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
+ private final String key;
+ private final NimbusInfo nimbusInfo;
+ private final int INT_CAPACITY = 4;
+
+ public KeySequenceNumber(String key, NimbusInfo nimbusInfo) {
+ this.key = key;
+ this.nimbusInfo = nimbusInfo;
+ }
+
+ public int getKeySequenceNumber(Map conf) {
+ TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
+ CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
+ try {
+ // Key has not been created yet and it is the first time it is
being created
+ if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" +
key) == null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
+ "/" + key);
+
zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
+
ByteBuffer.allocate(INT_CAPACITY).putInt(1).array());
+ return 1;
+ }
+
+ // When all nimbodes go down and one or few of them come up
+ // Unfortunately there might not be an exact way to know which
one contains the most updated blob,
+ // if all go down which is unlikely. Hence there might be a
need to update the blob if all go down.
+ List<String> stateInfoList =
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+ LOG.debug("stateInfoList-size {} stateInfoList-data {}",
stateInfoList.size(), stateInfoList);
+ if(stateInfoList.isEmpty()) {
+ return getMaxSequenceNumber(zkClient);
+ }
+
+ LOG.debug("stateInfoSize {}", stateInfoList.size());
+ // In all other cases check for the latest update sequence of
the blob on the nimbus
+ // and assign the appropriate number. Check if all are have
same sequence number,
+ // if not assign the highest sequence number.
+ for (String stateInfo:stateInfoList) {
+
sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeVersionInfo(stateInfo)[1]));
+ }
+
+ // Update scenario 2 and 3 explain the code logic written here
+ // especially when nimbus crashes and comes up after and
before update
+ // respectively.
+ int currentSeqNumber = getMaxSequenceNumber(zkClient);
+ if (!checkIfStateContainsCurrentNimbusHost(stateInfoList,
nimbusInfo) && !nimbusInfo.isLeader()) {
+ if (sequenceNumbers.last() < currentSeqNumber) {
+ return currentSeqNumber;
+ } else {
+ return 0;
+ }
+ }
+
+ // It covers scenario's expalined in scenario 3 when nimbus-1
holding the latest
--- End diff --
`scenarios`
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)