Repository: giraph Updated Branches: refs/heads/trunk 78931c03f -> dc4d9a2a7
GIRAPH-975 In-proc ZooKeeper server with Master process https://reviews.facebook.net/D30693 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dc4d9a2a Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dc4d9a2a Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dc4d9a2a Branch: refs/heads/trunk Commit: dc4d9a2a7f5d2e40fc1e28c3b8da011306d7dccc Parents: 78931c0 Author: Sergey Edunov <[email protected]> Authored: Tue Dec 23 16:04:11 2014 -0800 Committer: Sergey Edunov <[email protected]> Committed: Mon Jan 5 18:37:45 2015 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/conf/GiraphConstants.java | 9 + .../apache/giraph/graph/GraphTaskManager.java | 2 +- .../giraph/zk/InProcessZooKeeperRunner.java | 168 ++++++++++++++ .../giraph/zk/OutOfProcessZooKeeperRunner.java | 227 +++++++++++++++++++ .../org/apache/giraph/zk/ZooKeeperManager.java | 196 +++------------- .../org/apache/giraph/zk/ZooKeeperRunner.java | 46 ++++ 7 files changed, 486 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index fbefa2f..08fe806 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-975: In-proc ZooKeeper server with Master process (edunov) + GIRAPH-977: useMessageSizeEncoding is broken (majakabiljo) GIRAPH-976: More command line logging (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index e78eb42..bbf3bd2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -554,6 +554,15 @@ public interface GiraphConstants { "Msecs to wait before retrying a failed ZooKeeper op due to " + "connection loss."); + /** + * Should start zookeeper inside master java process or separately? + * In process by default. + */ + BooleanConfOption ZOOKEEEPER_RUNS_IN_PROCESS = new BooleanConfOption( + "giraph.zkRunsInProcess", + true, "If true run zookeeper in master process, if false starts " + + "separate process for zookeeper"); + /** TCP backlog (defaults to number of workers) */ IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1, "TCP backlog (defaults to number of workers)"); http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 7d19014..d479d74 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -956,7 +956,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (graphFunctions.isZooKeeper()) { // ZooKeeper may have had an issue if (zkManager != null) { - zkManager.logZooKeeperOutput(Level.WARN); + zkManager.cleanup(); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java new file mode 100644 index 0000000..5556216 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.zk; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.log4j.Logger; +import org.apache.zookeeper.jmx.ManagedUtil; +import org.apache.zookeeper.server.DatadirCleanupManager; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; + +import javax.management.JMException; +import java.io.IOException; + +/** + * Zookeeper wrapper that starts zookeeper withing master process. + */ +public class InProcessZooKeeperRunner + extends DefaultImmutableClassesGiraphConfigurable + implements ZooKeeperRunner { + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(InProcessZooKeeperRunner.class); + /** + * Wrapper for zookeeper quorum. + */ + private QuorumRunner quorumRunner = new QuorumRunner(); + + @Override + public void start(String zkDir, final String configFilePath) { + Thread zkThread = new Thread(new Runnable() { + @Override + public void run() { + try { + quorumRunner.start(configFilePath); + } catch (IOException e) { + LOG.error("Unable to start zookeeper", e); + } catch (QuorumPeerConfig.ConfigException e) { + LOG.error("Invalid config, zookeeper failed", e); + } + } + }); + zkThread.setDaemon(true); + zkThread.start(); + } + + @Override + public void stop() { + try { + quorumRunner.stop(); + } catch (InterruptedException e) { + LOG.error("Unable to cleanly shutdown zookeeper", e); + } + } + + @Override + public void cleanup() { + } + + /** + * Wrapper around zookeeper quorum. Does not necessarily + * starts quorum, if there is only one server in config file + * will only start zookeeper. + */ + private static class QuorumRunner extends QuorumPeerMain { + + /** + * ZooKeeper server wrapper. + */ + private ZooKeeperServerRunner serverRunner; + + /** + * Starts quorum and/or zookeeper service. + * @param configFilePath quorum and zookeeper configuration + * @throws IOException + * @throws QuorumPeerConfig.ConfigException if config + * is not formatted properly + */ + public void start(String configFilePath) throws IOException, + QuorumPeerConfig.ConfigException { + QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig(); + quorumPeerConfig.parse(configFilePath); + // Start and schedule the the purge task + DatadirCleanupManager purgeMgr = new DatadirCleanupManager( + quorumPeerConfig + .getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig + .getSnapRetainCount(), quorumPeerConfig.getPurgeInterval()); + purgeMgr.start(); + + if (quorumPeerConfig.getServers().size() > 0) { + runFromConfig(quorumPeerConfig); + } else { + serverRunner = new ZooKeeperServerRunner(); + serverRunner.start(configFilePath); + } + + LOG.info("Initialization ended"); + } + + /** + * Stop quorum and/or zookeeper. + * @throws InterruptedException + */ + public void stop() throws InterruptedException { + if (quorumPeer != null) { + quorumPeer.shutdown(); + quorumPeer.join(); + } else if (serverRunner != null) { + serverRunner.stop(); + } else { + LOG.warn("Neither quorum nor server is set"); + } + } + } + + /** + * Wrapper around zookeeper service. + */ + private static class ZooKeeperServerRunner extends ZooKeeperServerMain { + + /** + * Start zookeeper service. + * @param configFilePath zookeeper configuration file + * @throws QuorumPeerConfig.ConfigException if config file is not + * formatted properly + * @throws IOException + */ + public void start(String configFilePath) throws + QuorumPeerConfig.ConfigException, IOException { + LOG.warn("Either no config or no quorum defined in config, running " + + " in standalone mode"); + try { + ManagedUtil.registerLog4jMBeans(); + } catch (JMException e) { + LOG.warn("Unable to register log4j JMX control", e); + } + + ServerConfig serverConfig = new ServerConfig(); + serverConfig.parse(configFilePath); + runFromConfig(serverConfig); + } + + /** + * Stop zookeeper service. + */ + public void stop() { + shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java new file mode 100644 index 0000000..c86a199 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.zk; + +import com.google.common.collect.Lists; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** + * Zookeeper wrapper that starts zookeeper in the separate process (old way). + */ +public class OutOfProcessZooKeeperRunner + extends DefaultImmutableClassesGiraphConfigurable + implements ZooKeeperRunner { + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(OutOfProcessZooKeeperRunner.class); + + /** ZooKeeper process */ + private Process zkProcess; + /** Thread that gets the zkProcess output */ + private StreamCollector zkProcessCollector = null; + /** Synchronization lock for zkProcess */ + private final Object processLock = new Object(); + + @Override + public void start(String zkDir, String configFilePath) { + try { + ProcessBuilder processBuilder = new ProcessBuilder(); + List<String> commandList = Lists.newArrayList(); + String javaHome = System.getProperty("java.home"); + if (javaHome == null) { + throw new IllegalArgumentException( + "onlineZooKeeperServers: java.home is not set!"); + } + commandList.add(javaHome + "/bin/java"); + commandList.add("-cp"); + commandList.add(System.getProperty("java.class.path")); + String zkJavaOptsString = + GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(getConf()); + String[] zkJavaOptsArray = zkJavaOptsString.split(" "); + if (zkJavaOptsArray != null) { + commandList.addAll(Arrays.asList(zkJavaOptsArray)); + } + commandList.add(QuorumPeerMain.class.getName()); + commandList.add(configFilePath); + processBuilder.command(commandList); + File execDirectory = new File(zkDir); + processBuilder.directory(execDirectory); + processBuilder.redirectErrorStream(true); + if (LOG.isInfoEnabled()) { + LOG.info("onlineZooKeeperServers: Attempting to " + + "start ZooKeeper server with command " + commandList + + " in directory " + execDirectory.toString()); + } + synchronized (processLock) { + zkProcess = processBuilder.start(); + zkProcessCollector = + new StreamCollector(zkProcess.getInputStream()); + zkProcessCollector.start(); + } + Runnable runnable = new Runnable() { + public void run() { + LOG.info("run: Shutdown hook started."); + synchronized (processLock) { + if (zkProcess != null) { + LOG.warn("onlineZooKeeperServers: " + + "Forced a shutdown hook kill of the " + + "ZooKeeper process."); + zkProcess.destroy(); + int exitCode = -1; + try { + exitCode = zkProcess.waitFor(); + } catch (InterruptedException e) { + LOG.warn("run: Couldn't get exit code."); + } + LOG.info("onlineZooKeeperServers: ZooKeeper process exited " + + "with " + exitCode + " (note that 143 " + + "typically means killed)."); + } + } + } + }; + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + LOG.info("onlineZooKeeperServers: Shutdown hook added."); + } catch (IOException e) { + LOG.error("onlineZooKeeperServers: Failed to start " + + "ZooKeeper process", e); + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + zkProcess.destroy(); + int exitValue = -1; + try { + zkProcessCollector.join(); + exitValue = zkProcess.waitFor(); + } catch (InterruptedException e) { + LOG.warn("offlineZooKeeperServers: " + + "InterruptedException, but continuing ", + e); + } + if (LOG.isInfoEnabled()) { + LOG.info("offlineZooKeeperServers: waitFor returned " + + exitValue); + } + } + + @Override + public void cleanup() { + logZooKeeperOutput(Level.WARN); + } + + + /** + * Collects the output of a stream and dumps it to the log. + */ + private static class StreamCollector extends Thread { + /** Number of last lines to keep */ + private static final int LAST_LINES_COUNT = 100; + /** Class logger */ + private static final Logger LOG = Logger.getLogger(StreamCollector.class); + /** Buffered reader of input stream */ + private final BufferedReader bufferedReader; + /** Last lines (help to debug failures) */ + private final LinkedList<String> lastLines = Lists.newLinkedList(); + /** + * Constructor. + * + * @param is InputStream to dump to LOG.info + */ + public StreamCollector(final InputStream is) { + super(StreamCollector.class.getName()); + setDaemon(true); + InputStreamReader streamReader = new InputStreamReader(is, + Charset.defaultCharset()); + bufferedReader = new BufferedReader(streamReader); + } + + @Override + public void run() { + readLines(); + } + + /** + * Read all the lines from the bufferedReader. + */ + private synchronized void readLines() { + String line; + try { + while ((line = bufferedReader.readLine()) != null) { + if (lastLines.size() > LAST_LINES_COUNT) { + lastLines.removeFirst(); + } + lastLines.add(line); + + if (LOG.isDebugEnabled()) { + LOG.debug("readLines: " + line); + } + } + } catch (IOException e) { + LOG.error("readLines: Ignoring IOException", e); + } + } + + /** + * Dump the last n lines of the collector. Likely used in + * the case of failure. + * + * @param level Log level to dump with + */ + public synchronized void dumpLastLines(Level level) { + // Get any remaining lines + readLines(); + // Dump the lines to the screen + for (String line : lastLines) { + LOG.log(level, line); + } + } + } + + + /** + * Log the zookeeper output from the process (if it was started) + * + * @param level Log level to print at + */ + public void logZooKeeperOutput(Level level) { + if (zkProcessCollector != null) { + LOG.log(level, "logZooKeeperOutput: Dumping up to last " + + StreamCollector.LAST_LINES_COUNT + + " lines of the ZooKeeper process STDOUT and STDERR."); + zkProcessCollector.dumpLastLines(level); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index b5816d7..82a408b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -18,7 +18,6 @@ package org.apache.giraph.zk; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import com.google.common.util.concurrent.Uninterruptibles; @@ -32,25 +31,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; -import java.io.BufferedReader; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Writer; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -97,10 +89,8 @@ public class ZooKeeperManager { private final int serverCount; /** File system */ private final FileSystem fs; - /** ZooKeeper process */ - private Process zkProcess = null; - /** Thread that gets the zkProcess output */ - private StreamCollector zkProcessCollector = null; + /** Zookeeper wrapper */ + private ZooKeeperRunner zkRunner; /** ZooKeeper local file system directory */ private final String zkDir; /** ZooKeeper config file path */ @@ -200,73 +190,6 @@ public class ZooKeeperManager { } /** - * Collects the output of a stream and dumps it to the log. - */ - private static class StreamCollector extends Thread { - /** Number of last lines to keep */ - private static final int LAST_LINES_COUNT = 100; - /** Class logger */ - private static final Logger LOG = Logger.getLogger(StreamCollector.class); - /** Buffered reader of input stream */ - private final BufferedReader bufferedReader; - /** Last lines (help to debug failures) */ - private final LinkedList<String> lastLines = Lists.newLinkedList(); - /** - * Constructor. - * - * @param is InputStream to dump to LOG.info - */ - public StreamCollector(final InputStream is) { - super(StreamCollector.class.getName()); - setDaemon(true); - InputStreamReader streamReader = new InputStreamReader(is, - Charset.defaultCharset()); - bufferedReader = new BufferedReader(streamReader); - } - - @Override - public void run() { - readLines(); - } - - /** - * Read all the lines from the bufferedReader. - */ - private synchronized void readLines() { - String line; - try { - while ((line = bufferedReader.readLine()) != null) { - if (lastLines.size() > LAST_LINES_COUNT) { - lastLines.removeFirst(); - } - lastLines.add(line); - - if (LOG.isDebugEnabled()) { - LOG.debug("readLines: " + line); - } - } - } catch (IOException e) { - LOG.error("readLines: Ignoring IOException", e); - } - } - - /** - * Dump the last n lines of the collector. Likely used in - * the case of failure. - * - * @param level Log level to dump with - */ - public synchronized void dumpLastLines(Level level) { - // Get any remaining lines - readLines(); - // Dump the lines to the screen - for (String line : lastLines) { - LOG.log(level, line); - } - } - } - - /** * Create the candidate stamps and decide on the servers to start if * you are partition 0. * @@ -652,67 +575,9 @@ public class ZooKeeperManager { "directory " + this.zkDir, e); } generateZooKeeperConfigFile(new ArrayList<>(zkServerPortMap.keySet())); - ProcessBuilder processBuilder = new ProcessBuilder(); - List<String> commandList = Lists.newArrayList(); - String javaHome = System.getProperty("java.home"); - if (javaHome == null) { - throw new IllegalArgumentException( - "onlineZooKeeperServers: java.home is not set!"); - } - commandList.add(javaHome + "/bin/java"); - commandList.add("-cp"); - commandList.add(System.getProperty("java.class.path")); - String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf); - String[] zkJavaOptsArray = zkJavaOptsString.split(" "); - if (zkJavaOptsArray != null) { - commandList.addAll(Arrays.asList(zkJavaOptsArray)); - } - commandList.add(QuorumPeerMain.class.getName()); - commandList.add(configFilePath); - processBuilder.command(commandList); - File execDirectory = new File(zkDir); - processBuilder.directory(execDirectory); - processBuilder.redirectErrorStream(true); - if (LOG.isInfoEnabled()) { - LOG.info("onlineZooKeeperServers: Attempting to " + - "start ZooKeeper server with command " + commandList + - " in directory " + execDirectory.toString()); - } - try { - synchronized (this) { - zkProcess = processBuilder.start(); - zkProcessCollector = - new StreamCollector(zkProcess.getInputStream()); - zkProcessCollector.start(); - } - Runnable runnable = new Runnable() { - public void run() { - LOG.info("run: Shutdown hook started."); - synchronized (this) { - if (zkProcess != null) { - LOG.warn("onlineZooKeeperServers: " + - "Forced a shutdown hook kill of the " + - "ZooKeeper process."); - zkProcess.destroy(); - int exitCode = -1; - try { - exitCode = zkProcess.waitFor(); - } catch (InterruptedException e) { - LOG.warn("run: Couldn't get exit code."); - } - LOG.info("onlineZooKeeperServers: ZooKeeper process exited " + - "with " + exitCode + " (note that 143 " + - "typically means killed)."); - } - } - } - }; - Runtime.getRuntime().addShutdownHook(new Thread(runnable)); - LOG.info("onlineZooKeeperServers: Shutdown hook added."); - } catch (IOException e) { - LOG.error("onlineZooKeeperServers: Failed to start " + - "ZooKeeper process", e); - throw new RuntimeException(e); + synchronized (this) { + zkRunner = createRunner(); + zkRunner.start(zkDir, configFilePath); } // Once the server is up and running, notify that this server is up @@ -907,7 +772,7 @@ public class ZooKeeperManager { createZooKeeperClosedStamp(); } synchronized (this) { - if (zkProcess != null) { + if (zkRunner != null) { boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf); int totalWorkers = conf.getMapTasks(); // A Yarn job always spawns MAX_WORKERS + 1 containers @@ -917,33 +782,43 @@ public class ZooKeeperManager { LOG.info("offlineZooKeeperServers: Will wait for " + totalWorkers + " tasks"); waitUntilAllTasksDone(totalWorkers); - zkProcess.destroy(); - int exitValue = -1; + zkRunner.stop(); File zkDirFile; try { - zkProcessCollector.join(); - exitValue = zkProcess.waitFor(); zkDirFile = new File(zkDir); FileUtils.deleteDirectory(zkDirFile); - } catch (InterruptedException e) { - LOG.warn("offlineZooKeeperServers: " + - "InterruptedException, but continuing ", - e); } catch (IOException e) { LOG.warn("offlineZooKeeperSevers: " + - "IOException, but continuing", + "IOException, but continuing", e); } if (LOG.isInfoEnabled()) { - LOG.info("offlineZooKeeperServers: waitFor returned " + - exitValue + " and deleted directory " + zkDir); + LOG.info("offlineZooKeeperServers: deleted directory " + zkDir); } - zkProcess = null; + zkRunner = null; } } } /** + * Create appropriate zookeeper wrapper depending on configuration. + * Zookeeper can run in master process or outside as a separate + * java process. + * + * @return either in process or out of process wrapper. + */ + private ZooKeeperRunner createRunner() { + ZooKeeperRunner runner; + if (GiraphConstants.ZOOKEEEPER_RUNS_IN_PROCESS.get(conf)) { + runner = new InProcessZooKeeperRunner(); + } else { + runner = new OutOfProcessZooKeeperRunner(); + } + runner.setConf(conf); + return runner; + } + + /** * Is this task running a ZooKeeper server? Only could be true if called * after onlineZooKeeperServers(). * @@ -951,21 +826,16 @@ public class ZooKeeperManager { */ public boolean runsZooKeeper() { synchronized (this) { - return zkProcess != null; + return zkRunner != null; } } /** - * Log the zookeeper output from the process (if it was started) - * - * @param level Log level to print at + * Do necessary cleanup in zookeeper wrapper. */ - public void logZooKeeperOutput(Level level) { - if (zkProcessCollector != null) { - LOG.log(level, "logZooKeeperOutput: Dumping up to last " + - StreamCollector.LAST_LINES_COUNT + - " lines of the ZooKeeper process STDOUT and STDERR."); - zkProcessCollector.dumpLastLines(level); + public void cleanup() { + if (zkRunner != null) { + zkRunner.cleanup(); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java new file mode 100644 index 0000000..4c13a25 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.zk; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; + +/** + * ZooKeeper wrapper interface. + * Implementation should provide a way to start, stop and cleanup + * zookeeper. + */ +public interface ZooKeeperRunner extends ImmutableClassesGiraphConfigurable { + + /** + * Starts zookeeper service in specified working directory with + * specified config file. + * @param zkDir working directory + * @param configFilePath path to the config file + */ + void start(String zkDir, String configFilePath); + + /** + * Stops zookeeper. + */ + void stop(); + + /** + * Does necessary cleanup after zookeeper job is complete. + */ + void cleanup(); +}
