[ https://issues.apache.org/jira/browse/STORM-1257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117396#comment-15117396 ]
ASF GitHub Bot commented on STORM-1257: --------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1047#discussion_r50850244 --- Diff: storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.zookeeper; + +import clojure.lang.PersistentArrayMap; +import clojure.lang.RT; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.storm.Config; +import org.apache.storm.callback.DefaultWatcherCallBack; +import org.apache.storm.callback.WatcherCallBack; +import org.apache.storm.nimbus.ILeaderElector; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.ZookeeperAuthInfo; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class Zookeeper { + + private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class); + + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) { + return mkClient(conf, servers, port, root, new DefaultWatcherCallBack()); + } + + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, Map authConf) { + return mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), authConf); + } + + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, Map authConf) { + return mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), authConf); + } + + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { + CuratorFramework fk; + if (authConf != null) { + fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf)); + } else { + fk = Utils.newCurator(conf, servers, port, root); + } + + fk.getCuratorListenable().addListener(new CuratorListener() { + @Override + public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { + if (e.getType().equals(CuratorEventType.WATCHED)) { + WatchedEvent event = e.getWatchedEvent(); + + watcher.execute(event.getState(), event.getType(), event.getPath()); + } + + } + }); + fk.start(); + return fk; + } + + /** + * connect ZK, register Watch/unhandle Watch + * + * @return + */ + public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { + + return mkClient(conf, servers, port, root, watcher, null); + } + + public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) + throws RuntimeException { + + String ret = null; + try { + String npath = Utils.normalizePath(path); + ret = zk.create().withMode(mode).withACL(acls).forPath(npath, data); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + return ret; + } + + public static String createNode(CuratorFramework zk, String path, byte[] data, List<ACL> acls) throws RuntimeException { + return createNode(zk, path, data, org.apache.zookeeper.CreateMode.PERSISTENT, acls); + } + + public static boolean existsNode(CuratorFramework zk, String path, boolean watch) throws RuntimeException { + Stat stat = null; + try { + if (watch) { + stat = zk.checkExists().watched().forPath(Utils.normalizePath(path)); + } else { + stat = zk.checkExists().forPath(Utils.normalizePath(path)); + } + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + + return stat != null; + } + + public static void deleteNode(CuratorFramework zk, String path) throws RuntimeException { + try { + String npath = Utils.normalizePath(path); + if (existsNode(zk, npath, false)) { + zk.delete().deletingChildrenIfNeeded().forPath(Utils.normalizePath(path)); + } + + } catch (KeeperException.NoNodeException e) { + LOG.info("exception", e); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + public static void mkdirs(CuratorFramework zk, String path, List<ACL> acls) throws RuntimeException { + + String npath = Utils.normalizePath(path); + if (npath.equals("/")) { + return; + } + if (existsNode(zk, npath, false)) { + return; + } + byte[] byteArray = new byte[1]; + byteArray[0] = (byte) 7; + createNode(zk, npath, byteArray, org.apache.zookeeper.CreateMode.PERSISTENT, acls); + + } + + public static void syncPath(CuratorFramework zk, String path) throws RuntimeException { + try { + zk.sync().forPath(Utils.normalizePath(path)); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + public static void addListener(CuratorFramework zk, ConnectionStateListener listener) { + zk.getConnectionStateListenable().addListener(listener); + } + + public static byte[] getData(CuratorFramework zk, String path, boolean watch) throws RuntimeException { + + try { + String npath = Utils.normalizePath(path); + if (existsNode(zk, npath, watch)) { + if (watch) { + return zk.getData().watched().forPath(npath); + } else { + return zk.getData().forPath(npath); + } + } + } catch (KeeperException e) { + // this is fine b/c we still have a watch from the successful exists call + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + + return null; + } + + public static Integer getVersion(CuratorFramework zk, String path, boolean watch) throws Exception { + String npath = Utils.normalizePath(path); + Stat stat = null; + if (existsNode(zk, npath, watch)) { + if (watch) { + stat = zk.checkExists().watched().forPath(Utils.normalizePath(path)); + } else { + stat = zk.checkExists().forPath(Utils.normalizePath(path)); + } + return Integer.valueOf(stat.getVersion()); + } + + return null; + } + + public static List<String> getChildren(CuratorFramework zk, String path, boolean watch) throws RuntimeException { + + try { + String npath = Utils.normalizePath(path); + if (watch) { + return zk.getChildren().watched().forPath(npath); + } else { + return zk.getChildren().forPath(npath); + } + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + // Deletes the state inside the zookeeper for a key, for which the + // contents of the key starts with nimbus host port information + public static void deleteDodeBlobstore(CuratorFramework zk, String parentPath, String hostPortInfo) throws RuntimeException { + String parentnPath = Utils.normalizePath(parentPath); + List<String> childPathList = null; + if (existsNode(zk, parentnPath, false)) { + childPathList = getChildren(zk, parentnPath, false); + } + for (String child : childPathList) { + if (child.startsWith(hostPortInfo)) { + LOG.debug("deleteNode child " + child); --- End diff -- `LOG.debug("deleteNode child {}", child);` instead > port backtype.storm.zookeeper to java > ------------------------------------- > > Key: STORM-1257 > URL: https://issues.apache.org/jira/browse/STORM-1257 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: John Fang > Labels: java-migration, jstorm-merger > > A wrapper around zookeeper/curator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)