[ 
https://issues.apache.org/jira/browse/STORM-1257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117393#comment-15117393
 ] 

ASF GitHub Bot commented on STORM-1257:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1047#discussion_r50850102
  
    --- Diff: storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.zookeeper;
    +
    +import clojure.lang.PersistentArrayMap;
    +import clojure.lang.RT;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.CuratorEvent;
    +import org.apache.curator.framework.api.CuratorEventType;
    +import org.apache.curator.framework.api.CuratorListener;
    +import org.apache.curator.framework.api.UnhandledErrorListener;
    +import org.apache.curator.framework.recipes.leader.LeaderLatch;
    +import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
    +import org.apache.curator.framework.recipes.leader.Participant;
    +import org.apache.curator.framework.state.ConnectionStateListener;
    +import org.apache.storm.Config;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.callback.WatcherCallBack;
    +import org.apache.storm.nimbus.ILeaderElector;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.ZookeeperAuthInfo;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.data.ACL;
    +import org.apache.zookeeper.data.Stat;
    +import org.apache.zookeeper.server.NIOServerCnxnFactory;
    +import org.apache.zookeeper.server.ZooKeeperServer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.BindException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +public class Zookeeper {
    +
    +    private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
    +
    +    public static CuratorFramework mkClient(Map conf, List<String> 
servers, Object port, String root) {
    +        return mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack());
    +    }
    +
    +    public static CuratorFramework mkClient(Map conf, List<String> 
servers, Object port, Map authConf) {
    +        return mkClient(conf, servers, port, "", new 
DefaultWatcherCallBack(), authConf);
    +    }
    +
    +    public static CuratorFramework mkClient(Map conf, List<String> 
servers, Object port, String root, Map authConf) {
    +        return mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), authConf);
    +    }
    +
    +    public static CuratorFramework mkClient(Map conf, List<String> 
servers, Object port, String root, final WatcherCallBack watcher, Map authConf) 
{
    +        CuratorFramework fk;
    +        if (authConf != null) {
    +            fk = Utils.newCurator(conf, servers, port, root, new 
ZookeeperAuthInfo(authConf));
    +        } else {
    +            fk = Utils.newCurator(conf, servers, port, root);
    +        }
    +
    +        fk.getCuratorListenable().addListener(new CuratorListener() {
    +            @Override
    +            public void eventReceived(CuratorFramework _fk, CuratorEvent 
e) throws Exception {
    +                if (e.getType().equals(CuratorEventType.WATCHED)) {
    +                    WatchedEvent event = e.getWatchedEvent();
    +
    +                    watcher.execute(event.getState(), event.getType(), 
event.getPath());
    +                }
    +
    +            }
    +        });
    +        fk.start();
    +        return fk;
    +    }
    +
    +    /**
    +     * connect ZK, register Watch/unhandle Watch
    +     *
    +     * @return
    +     */
    +    public static CuratorFramework mkClient(Map conf, List<String> 
servers, Object port, String root, final WatcherCallBack watcher) {
    +
    +        return mkClient(conf, servers, port, root, watcher, null);
    +    }
    +
    +    public static String createNode(CuratorFramework zk, String path, 
byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls)
    +            throws RuntimeException {
    +
    +        String ret = null;
    +        try {
    +            String npath = Utils.normalizePath(path);
    +            ret = zk.create().withMode(mode).withACL(acls).forPath(npath, 
data);
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        return ret;
    +    }
    +
    +    public static String createNode(CuratorFramework zk, String path, 
byte[] data, List<ACL> acls) throws RuntimeException {
    +        return createNode(zk, path, data, 
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
    +    }
    +
    +    public static boolean existsNode(CuratorFramework zk, String path, 
boolean watch) throws RuntimeException {
    +        Stat stat = null;
    +        try {
    +            if (watch) {
    +                stat = 
zk.checkExists().watched().forPath(Utils.normalizePath(path));
    +            } else {
    +                stat = zk.checkExists().forPath(Utils.normalizePath(path));
    +            }
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +        return stat != null;
    +    }
    +
    +    public static void deleteNode(CuratorFramework zk, String path) throws 
RuntimeException {
    +        try {
    +            String npath = Utils.normalizePath(path);
    +            if (existsNode(zk, npath, false)) {
    +                
zk.delete().deletingChildrenIfNeeded().forPath(Utils.normalizePath(path));
    +            }
    +
    +        } catch (KeeperException.NoNodeException e) {
    +            LOG.info("exception", e);
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    public static void mkdirs(CuratorFramework zk, String path, List<ACL> 
acls) throws RuntimeException {
    +
    +        String npath = Utils.normalizePath(path);
    +        if (npath.equals("/")) {
    +            return;
    +        }
    +        if (existsNode(zk, npath, false)) {
    +            return;
    +        }
    +        byte[] byteArray = new byte[1];
    +        byteArray[0] = (byte) 7;
    +        createNode(zk, npath, byteArray, 
org.apache.zookeeper.CreateMode.PERSISTENT, acls);
    +
    +    }
    +
    +    public static void syncPath(CuratorFramework zk, String path) throws 
RuntimeException {
    +        try {
    +            zk.sync().forPath(Utils.normalizePath(path));
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    public static void addListener(CuratorFramework zk, 
ConnectionStateListener listener) {
    +        zk.getConnectionStateListenable().addListener(listener);
    +    }
    +
    +    public static byte[] getData(CuratorFramework zk, String path, boolean 
watch) throws RuntimeException {
    +
    +        try {
    +            String npath = Utils.normalizePath(path);
    +            if (existsNode(zk, npath, watch)) {
    +                if (watch) {
    +                    return zk.getData().watched().forPath(npath);
    +                } else {
    +                    return zk.getData().forPath(npath);
    +                }
    +            }
    +        } catch (KeeperException e) {
    +            // this is fine b/c we still have a watch from the successful 
exists call
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +
    +        return null;
    +    }
    +
    +    public static Integer getVersion(CuratorFramework zk, String path, 
boolean watch) throws Exception {
    +        String npath = Utils.normalizePath(path);
    +        Stat stat = null;
    +        if (existsNode(zk, npath, watch)) {
    +            if (watch) {
    +                stat = 
zk.checkExists().watched().forPath(Utils.normalizePath(path));
    +            } else {
    +                stat = zk.checkExists().forPath(Utils.normalizePath(path));
    +            }
    +            return Integer.valueOf(stat.getVersion());
    +        }
    +
    +        return null;
    +    }
    +
    +    public static List<String> getChildren(CuratorFramework zk, String 
path, boolean watch) throws RuntimeException {
    +
    +        try {
    +            String npath = Utils.normalizePath(path);
    +            if (watch) {
    +                return zk.getChildren().watched().forPath(npath);
    +            } else {
    +                return zk.getChildren().forPath(npath);
    +            }
    +        } catch (Exception e) {
    +            throw Utils.wrapInRuntime(e);
    +        }
    +    }
    +
    +    // Deletes the state inside the zookeeper for a key, for which the
    +    // contents of the key starts with nimbus host port information
    +    public static void deleteDodeBlobstore(CuratorFramework zk, String 
parentPath, String hostPortInfo) throws RuntimeException {
    +        String parentnPath = Utils.normalizePath(parentPath);
    +        List<String> childPathList = null;
    +        if (existsNode(zk, parentnPath, false)) {
    +            childPathList = getChildren(zk, parentnPath, false);
    +        }
    +        for (String child : childPathList) {
    --- End diff --
    
    If the parent path does not exist we will get an NPE here that we would not 
get before.  Please move this under the existsNode check.


> port backtype.storm.zookeeper to java
> -------------------------------------
>
>                 Key: STORM-1257
>                 URL: https://issues.apache.org/jira/browse/STORM-1257
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: John Fang
>              Labels: java-migration, jstorm-merger
>
> A wrapper around zookeeper/curator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to