[ 
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015019#comment-15015019
 ] 

ASF GitHub Bot commented on STORM-876:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45427932
  
    --- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.blobstore;
    +
    +import backtype.storm.nimbus.NimbusInfo;
    +import backtype.storm.utils.Utils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.ZooDefs;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.TreeSet;
    +import java.util.Map;
    +import java.util.List;
    +
    +/**
    + * Class hands over the version of the key to be stored within the 
zookeeper
    + */
    +public class KeyVersion {
    +  private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
    +  private final String BLOBSTORE_SUBTREE="/blobstore";
    +  private final String 
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
    +  private String key;
    +  private NimbusInfo nimbusInfo;
    +
    +  public KeyVersion(String key, NimbusInfo nimbusInfo) {
    +    this.key = key;
    +    this.nimbusInfo = nimbusInfo;
    +  }
    +
    +  public int getKeyVersion(Map conf) {
    +    TreeSet<Integer> versions = new TreeSet<Integer>();
    +    CuratorFramework zkClient = Utils.createZKClient(conf);
    +    try {
    +      // Key has not been created yet and it is the first time it is being 
created
    +      if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == 
null) {
    +        
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    +                
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE + 
"/" + key + "/" + 1);
    +        return 1;
    +      }
    +
    +      // When all nimbodes go down and one or few of them come up
    +      // Unfortunately there might not be an exact way to know which one 
contains the most updated blob
    +      // if all go down which is unlikely. Hence there might be a need to 
update the blob if all go down
    +      List<String> stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
    +      LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(), 
stateInfoList);
    +      if(stateInfoList.isEmpty()) {
    +        return getKeyVersionCounterValue(zkClient, key);
    +      }
    +
    +      LOG.debug("stateInfoSize {}", stateInfoList.size());
    +      // In all other cases check for the latest version on the nimbus and 
assign the version
    +      // check if all are have same version, if not assign the highest 
version
    +      for (String stateInfo:stateInfoList) {
    +        
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
    +      }
    +
    +      int currentCounter = getKeyVersionCounterValue(zkClient, key);
    +      // This condition returns version when a nimbus crashes and comes up
    +      if (!checkIfStateContainsCurrentNimbusHost(stateInfoList, 
nimbusInfo) && !nimbusInfo.isLeader()) {
    +        if (versions.last() < currentCounter) {
    +          return currentCounter;
    +        } else {
    +          return currentCounter - 1;
    +        }
    +      }
    +      // Condition checks for an update scenario
    +      if (stateInfoList.size() >= 1 && versions.size() == 1) {
    +        if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
    +          incrementCounter(zkClient, key, currentCounter);
    +          return currentCounter + 1;
    +        } else {
    +          incrementCounter(zkClient, key, currentCounter);
    +          return versions.first() + 1;
    +        }
    +      }
    +    } catch(Exception e) {
    +      LOG.error("Exception {}", e);
    +    } finally {
    +      if (zkClient != null) {
    +        zkClient.close();
    +      }
    +    }
    +    return versions.last();
    +  }
    +
    +  public boolean checkIfStateContainsCurrentNimbusHost(List<String> 
stateInfoList, NimbusInfo nimbusInfo) {
    +    boolean containsNimbusHost = false;
    +    for(String stateInfo:stateInfoList) {
    +      if(stateInfo.contains(nimbusInfo.getHost())) {
    +        containsNimbusHost = true;
    +        break;
    +      }
    +    }
    +    return containsNimbusHost;
    +  }
    +
    +  public void incrementCounter(CuratorFramework zkClient, String key, int 
count) throws Exception {
    --- End diff --
    
    `private`?


> Dist Cache: Basic Functionality
> -------------------------------
>
>                 Key: STORM-876
>                 URL: https://issues.apache.org/jira/browse/STORM-876
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>         Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and 
> downloading dist cache items.  storm-core.ser, storm-conf.ser and storm.jar 
> should be written into the blob store instead of residing locally. We need a 
> default implementation of the blob store that does essentially what nimbus 
> currently does and does not need anything extra.  But having an HDFS backend 
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and 
> provide a working directory for the worker process with symlinks to the 
> blobs.  It should also allow the blobs to be updated and switch the symlink 
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process 
> of getting it ready to push back to open source shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to