Repository: giraph Updated Branches: refs/heads/trunk c9ab310db -> b5b76c284
GIRAPH-1054: Separate ThriftService from JobProgressTrackerService on the client Summary: * Moves the job tracker conf options into the GiraphConstants * Factors out the static GiraphJob#startThriftServer and GiraphJob#stopThriftServer methods from createJobProgressServer * Allows adding other Thrift services to the ThriftServer Test Plan: Tried on a cluster Reviewers: maja.kabiljo, sergey.edunov Reviewed By: sergey.edunov Subscribers: sergey.edunov Differential Revision: https://reviews.facebook.net/D57087 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b5b76c28 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b5b76c28 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b5b76c28 Branch: refs/heads/trunk Commit: b5b76c28408e7401dc61fc2047182d971bbc2537 Parents: c9ab310 Author: Avery Ching <[email protected]> Authored: Wed Apr 13 16:03:40 2016 -0700 Committer: Avery Ching <[email protected]> Committed: Fri Apr 22 10:57:20 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/conf/GiraphConstants.java | 6 +- .../RetryableJobProgressTrackerClient.java | 5 +- .../apache/giraph/job/ClientThriftServer.java | 92 ++++++++++++++++++++ .../java/org/apache/giraph/job/GiraphJob.java | 13 ++- .../apache/giraph/job/JobProgressTracker.java | 13 +-- .../giraph/job/JobProgressTrackerService.java | 38 +------- 6 files changed, 115 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index b7f0d5c..1e51101 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -17,9 +17,6 @@ */ package org.apache.giraph.conf; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; - import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.aggregators.TextAggregatorWriter; import org.apache.giraph.bsp.BspOutputFormat; @@ -83,6 +80,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.OutputFormat; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + /** * Constants used all over Giraph for configuration. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java index 9ce12ed..60cb586 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java @@ -19,6 +19,7 @@ package org.apache.giraph.graph; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.job.ClientThriftServer; import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.worker.WorkerProgress; import org.apache.log4j.Logger; @@ -78,8 +79,8 @@ public class RetryableJobProgressTrackerClient ImmutableSet.<ThriftClientEventHandler>of()); FramedClientConnector connector = new FramedClientConnector(new InetSocketAddress( - JOB_PROGRESS_SERVICE_HOST.get(conf), - JOB_PROGRESS_SERVICE_PORT.get(conf))); + ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf), + ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf))); jobProgressTracker = clientManager.createClient(connector, JobProgressTracker.class).get(); http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java b/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java new file mode 100644 index 0000000..ce7b5d9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import com.facebook.swift.codec.ThriftCodecManager; +import com.facebook.swift.service.ThriftEventHandler; +import com.facebook.swift.service.ThriftServer; +import com.facebook.swift.service.ThriftServerConfig; +import com.facebook.swift.service.ThriftServiceProcessor; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.conf.StrConfOption; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Manages the life cycle of the Thrift server started on the client. + */ +public class ClientThriftServer { + /** + * The client can run a Thrift server (e.g. job progress service). + * This is the host of the Thrift server. + */ + public static final StrConfOption CLIENT_THRIFT_SERVER_HOST = + new StrConfOption("giraph.client.thrift.server.host", null, + "Host on which the client Thrift server runs (if enabled)"); + /** + * The client can run a Thrift server (e.g. job progress service). + * This is the port of the Thrift server. + */ + public static final IntConfOption CLIENT_THRIFT_SERVER_PORT = + new IntConfOption("giraph.client.thrift.server.port", -1, + "Port on which the client Thrift server runs (if enabled)"); + + /** Thrift server that is intended to run on the client */ + private final ThriftServer clientThriftServer; + + /** + * Create and start the Thrift server. + * + * @param conf Giraph conf to set the host and ports for. + * @param services Services to start + */ + public ClientThriftServer(GiraphConfiguration conf, + List<?> services) { + checkNotNull(conf, "conf is null"); + checkNotNull(services, "services is null"); + + ThriftServiceProcessor processor = + new ThriftServiceProcessor(new ThriftCodecManager(), + new ArrayList<ThriftEventHandler>(), + services); + clientThriftServer = + new ThriftServer(processor, new ThriftServerConfig()); + clientThriftServer.start(); + try { + CLIENT_THRIFT_SERVER_HOST.set( + conf, + conf.getLocalHostname()); + } catch (UnknownHostException e) { + throw new IllegalStateException("Unable to get host information", e); + } + CLIENT_THRIFT_SERVER_PORT.set(conf, clientThriftServer.getPort()); + } + + /** + * Stop the Thrift server. + */ + public void stopThriftServer() { + this.clientThriftServer.close(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 8792e59..90a73c6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -18,6 +18,7 @@ package org.apache.giraph.job; +import com.google.common.collect.ImmutableList; import org.apache.giraph.bsp.BspInputFormat; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; @@ -240,7 +241,13 @@ public class GiraphJob { GiraphJobObserver jobObserver = conf.getJobObserver(); JobProgressTrackerService jobProgressTrackerService = - JobProgressTrackerService.createJobProgressServer(conf, jobObserver); + JobProgressTrackerService.createJobProgressTrackerService( + conf, jobObserver); + ClientThriftServer clientThriftServer = null; + if (jobProgressTrackerService != null) { + clientThriftServer = new ClientThriftServer( + conf, ImmutableList.of(jobProgressTrackerService)); + } tryCount++; Job submittedJob = new Job(conf, jobName); @@ -271,6 +278,10 @@ public class GiraphJob { if (jobProgressTrackerService != null) { jobProgressTrackerService.stop(passed); } + if (clientThriftServer != null) { + clientThriftServer.stopThriftServer(); + } + jobObserver.jobFinished(submittedJob, passed); if (!passed) { http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java index 4da5450..3041d08 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -18,26 +18,15 @@ package org.apache.giraph.job; -import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.worker.WorkerProgress; - import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; +import org.apache.giraph.worker.WorkerProgress; /** * Interface for job progress tracker on job client */ @ThriftService public interface JobProgressTracker { - /** Host on which job progress service runs */ - StrConfOption JOB_PROGRESS_SERVICE_HOST = - new StrConfOption("giraph.jobProgressServiceHost", null, - "Host on which job progress service runs"); - /** Port which job progress service uses */ - IntConfOption JOB_PROGRESS_SERVICE_PORT = - new IntConfOption("giraph.jobProgressServicePort", -1, - "Port which job progress service uses"); /** Notify JobProgressTracker that mapper started */ @ThriftMethod http://git-wip-us.apache.org/repos/asf/giraph/blob/b5b76c28/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java index b08bf3e..c0189c0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -24,15 +24,7 @@ import org.apache.giraph.worker.WorkerProgress; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; -import com.facebook.swift.codec.ThriftCodecManager; -import com.facebook.swift.service.ThriftEventHandler; -import com.facebook.swift.service.ThriftServer; -import com.facebook.swift.service.ThriftServerConfig; -import com.facebook.swift.service.ThriftServiceProcessor; - import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -54,8 +46,6 @@ public class JobProgressTrackerService implements JobProgressTracker { private Thread writerThread; /** Whether application is finished */ private volatile boolean finished = false; - /** Server which uses this service */ - private ThriftServer server; /** Number of mappers which the job got */ private int mappersStarted; /** Last time number of mappers started was logged */ @@ -208,7 +198,6 @@ public class JobProgressTrackerService implements JobProgressTracker { public void stop(boolean succeeded) { finished = true; writerThread.interrupt(); - server.close(); if (LOG.isInfoEnabled()) { LOG.info("Job " + (succeeded ? "finished successfully" : "failed") + ", cleaning up..."); @@ -216,37 +205,18 @@ public class JobProgressTrackerService implements JobProgressTracker { } /** - * Create job progress server on job client, and update configuration with - * its hostname and port so mappers would know what to connect to. Returns - * null if progress shouldn't be tracked + * Create job progress server on job client if enabled in configuration. * * @param conf Configuration * @param jobObserver Giraph job callbacks * @return JobProgressTrackerService */ - public static JobProgressTrackerService createJobProgressServer( + public static JobProgressTrackerService createJobProgressTrackerService( GiraphConfiguration conf, GiraphJobObserver jobObserver) { if (!conf.trackJobProgressOnClient()) { return null; } - try { - JobProgressTrackerService service = - new JobProgressTrackerService(conf, jobObserver); - ThriftServiceProcessor processor = - new ThriftServiceProcessor(new ThriftCodecManager(), - new ArrayList<ThriftEventHandler>(), service); - service.server = new ThriftServer(processor, new ThriftServerConfig()); - service.server.start(); - JOB_PROGRESS_SERVICE_HOST.set(conf, - InetAddress.getLocalHost().getHostName()); - JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort()); - return service; - // CHECKSTYLE: stop IllegalCatch - } catch (Exception e) { - // CHECKSTYLE: resume IllegalCatch - LOG.warn("Exception occurred while trying to create " + - "JobProgressTrackerService - not using progress reporting", e); - return null; - } + + return new JobProgressTrackerService(conf, jobObserver); } }
