Updated Branches: refs/heads/trunk d97b39665 -> 6c0917ab1
GIRAPH-716: Stop modifying Configuration since it's not thread-safe (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6c0917ab Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6c0917ab Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6c0917ab Branch: refs/heads/trunk Commit: 6c0917ab17cd65d51e0d5315489d5d43bb9a8133 Parents: d97b396 Author: Maja Kabiljo <[email protected]> Authored: Mon Jul 15 14:20:26 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Jul 15 14:22:53 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 68 -------------------- .../io/internal/WrappedEdgeInputFormat.java | 13 ++-- .../giraph/io/internal/WrappedEdgeReader.java | 5 +- .../io/internal/WrappedVertexInputFormat.java | 13 ++-- .../io/internal/WrappedVertexOutputFormat.java | 53 +++++++-------- .../giraph/io/internal/WrappedVertexReader.java | 5 +- .../java/org/apache/giraph/job/HadoopUtils.java | 61 ++++++++++++++++-- pom.xml | 2 +- 9 files changed, 104 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 8cc0945..7e2cab1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-716: Stop modifying Configuration since it's not thread-safe (majakabiljo) + GIRAPH-715: Fix MessageValueFactory performance regression (nitay) GIRAPH-709: More flexible Jython script loading (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 74f1ba5..21cf245 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -45,7 +45,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.net.DNS; import java.net.UnknownHostException; -import java.util.Map; /** * Adds user methods specific to Giraph. This will be put into an @@ -57,14 +56,10 @@ import java.util.Map; */ public class GiraphConfiguration extends Configuration implements GiraphConstants { - /** Configuration with parameters which were set in Giraph */ - private final Configuration giraphSetParameters; - /** * Constructor that creates the configuration */ public GiraphConfiguration() { - giraphSetParameters = new Configuration(false); configureHadoopSecurity(); } @@ -75,7 +70,6 @@ public class GiraphConfiguration extends Configuration */ public GiraphConfiguration(Configuration conf) { super(conf); - giraphSetParameters = new Configuration(false); configureHadoopSecurity(); } @@ -984,68 +978,6 @@ public class GiraphConfiguration extends Configuration } /** - * Put all parameters set in Giraph to another configuration - * - * @param conf Configuration - */ - public synchronized void updateConfiguration(Configuration conf) { - if (this != conf) { - for (Map.Entry<String, String> parameter : giraphSetParameters) { - conf.set(parameter.getKey(), parameter.getValue()); - } - } - } - - @Override - public synchronized void set(String name, String value) { - super.set(name, value); - giraphSetParameters.set(name, value); - } - - @Override - public synchronized void setIfUnset(String name, String value) { - super.setIfUnset(name, value); - giraphSetParameters.set(name, get(name, value)); - } - - @Override - public synchronized void setInt(String name, int value) { - super.setInt(name, value); - giraphSetParameters.setInt(name, value); - } - - @Override - public synchronized void setLong(String name, long value) { - super.setLong(name, value); - giraphSetParameters.setLong(name, value); - } - - @Override - public synchronized void setFloat(String name, float value) { - super.setFloat(name, value); - giraphSetParameters.setFloat(name, value); - } - - @Override - public synchronized void setBoolean(String name, boolean value) { - super.setBoolean(name, value); - giraphSetParameters.setBoolean(name, value); - } - - @Override - public synchronized void setBooleanIfUnset(String name, boolean value) { - super.setBooleanIfUnset(name, value); - giraphSetParameters.setBoolean(name, getBoolean(name, value)); - } - - @Override - public synchronized void setClass(String name, Class<?> theClass, - Class<?> xface) { - super.setClass(name, theClass, xface); - giraphSetParameters.setClass(name, theClass, xface); - } - - /** * Get the output directory to write YourKit snapshots to * * @param context Map context http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java index c3adf4c..ffe765d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java @@ -20,6 +20,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.job.HadoopUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -60,23 +61,23 @@ public class WrappedEdgeInputFormat<I extends WritableComparable, @Override public void checkInputSpecs(Configuration conf) { - getConf().updateConfiguration(conf); - originalInputFormat.checkInputSpecs(conf); + originalInputFormat.checkInputSpecs(getConf()); } @Override public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - return originalInputFormat.getSplits(context, minSplitCountHint); + return originalInputFormat.getSplits( + HadoopUtils.makeJobContext(getConf(), context), + minSplitCountHint); } @Override public EdgeReader<I, E> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); EdgeReader<I, E> edgeReader = - originalInputFormat.createEdgeReader(split, context); + originalInputFormat.createEdgeReader(split, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); return new WrappedEdgeReader<I, E>(edgeReader, getConf()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java index e3b3689..9b5e8c6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.job.HadoopUtils; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -66,8 +67,8 @@ public class WrappedEdgeReader<I extends WritableComparable, @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - baseEdgeReader.initialize(inputSplit, context); + baseEdgeReader.initialize(inputSplit, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java index a58a32d..fddf75b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java @@ -20,6 +20,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; +import org.apache.giraph.job.HadoopUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -62,23 +63,23 @@ public class WrappedVertexInputFormat<I extends WritableComparable, @Override public void checkInputSpecs(Configuration conf) { - getConf().updateConfiguration(conf); - originalInputFormat.checkInputSpecs(conf); + originalInputFormat.checkInputSpecs(getConf()); } @Override public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - return originalInputFormat.getSplits(context, minSplitCountHint); + return originalInputFormat.getSplits( + HadoopUtils.makeJobContext(getConf(), context), + minSplitCountHint); } @Override public VertexReader<I, V, E> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); final VertexReader<I, V, E> vertexReader = - originalInputFormat.createVertexReader(split, context); + originalInputFormat.createVertexReader(split, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); return new WrappedVertexReader<I, V, E>(vertexReader, getConf()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java index bffa330..669bbe1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java @@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; +import org.apache.giraph.job.HadoopUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -64,9 +65,9 @@ public class WrappedVertexOutputFormat<I extends WritableComparable, @Override public VertexWriter<I, V, E> createVertexWriter( TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); final VertexWriter<I, V, E> vertexWriter = - originalOutputFormat.createVertexWriter(context); + originalOutputFormat.createVertexWriter( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); return new VertexWriter<I, V, E>() { @Override public void setConf( @@ -78,15 +79,15 @@ public class WrappedVertexOutputFormat<I extends WritableComparable, @Override public void initialize( TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - vertexWriter.initialize(context); + vertexWriter.initialize( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override public void close( TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - vertexWriter.close(context); + vertexWriter.close( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override @@ -100,66 +101,66 @@ public class WrappedVertexOutputFormat<I extends WritableComparable, @Override public void checkOutputSpecs( JobContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - originalOutputFormat.checkOutputSpecs(context); + originalOutputFormat.checkOutputSpecs( + HadoopUtils.makeJobContext(getConf(), context)); } @Override public OutputCommitter getOutputCommitter( TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); final OutputCommitter outputCommitter = - originalOutputFormat.getOutputCommitter(context); + originalOutputFormat.getOutputCommitter( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); return new OutputCommitter() { @Override public void setupJob(JobContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.setupJob(context); + outputCommitter.setupJob( + HadoopUtils.makeJobContext(getConf(), context)); } @Override public void setupTask(TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.setupTask(context); + outputCommitter.setupTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override public boolean needsTaskCommit( TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - return outputCommitter.needsTaskCommit(context); + return outputCommitter.needsTaskCommit( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override public void commitTask(TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.commitTask(context); + outputCommitter.commitTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override public void abortTask(TaskAttemptContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.abortTask(context); + outputCommitter.abortTask( + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override public void cleanupJob(JobContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.cleanupJob(context); + outputCommitter.cleanupJob( + HadoopUtils.makeJobContext(getConf(), context)); } /*if_not[HADOOP_NON_COMMIT_JOB]*/ @Override public void commitJob(JobContext context) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.commitJob(context); + outputCommitter.commitJob( + HadoopUtils.makeJobContext(getConf(), context)); } @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { - getConf().updateConfiguration(context.getConfiguration()); - outputCommitter.abortJob(context, state); + outputCommitter.abortJob( + HadoopUtils.makeJobContext(getConf(), context), state); } /*end[HADOOP_NON_COMMIT_JOB]*/ }; http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java index bf0a212..8e25602 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexReader; +import org.apache.giraph.job.HadoopUtils; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -67,8 +68,8 @@ public class WrappedVertexReader<I extends WritableComparable, @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { - getConf().updateConfiguration(context.getConfiguration()); - baseVertexReader.initialize(inputSplit, context); + baseVertexReader.initialize(inputSplit, + HadoopUtils.makeTaskAttemptContext(getConf(), context)); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java index d5095bc..9530fd6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java @@ -39,14 +39,16 @@ public class HadoopUtils { * Create a TaskAttemptContext, supporting many Hadoops. * * @param conf Configuration + * @param taskAttemptID TaskAttemptID to use * @return TaskAttemptContext */ - public static TaskAttemptContext makeTaskAttemptContext(Configuration conf) { + public static TaskAttemptContext makeTaskAttemptContext(Configuration conf, + TaskAttemptID taskAttemptID) { TaskAttemptContext context; /*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE] - context = new TaskAttemptContext(conf, new TaskAttemptID()); + context = new TaskAttemptContext(conf, taskAttemptID); else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ - context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + context = new TaskAttemptContextImpl(conf, taskAttemptID); /*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ return context; } @@ -54,9 +56,31 @@ public class HadoopUtils { /** * Create a TaskAttemptContext, supporting many Hadoops. * + * @param conf Configuration + * @param taskAttemptContext Use TaskAttemptID from this object + * @return TaskAttemptContext + */ + public static TaskAttemptContext makeTaskAttemptContext(Configuration conf, + TaskAttemptContext taskAttemptContext) { + return makeTaskAttemptContext(conf, taskAttemptContext.getTaskAttemptID()); + } + + /** + * Create a TaskAttemptContext, supporting many Hadoops. + * + * @param conf Configuration + * @return TaskAttemptContext + */ + public static TaskAttemptContext makeTaskAttemptContext(Configuration conf) { + return makeTaskAttemptContext(conf, new TaskAttemptID()); + } + + /** + * Create a TaskAttemptContext, supporting many Hadoops. + * * @return TaskAttemptContext */ - public static TaskAttemptContext makeTaskContext() { + public static TaskAttemptContext makeTaskAttemptContext() { return makeTaskAttemptContext(new Configuration()); } @@ -64,14 +88,15 @@ public class HadoopUtils { * Create a JobContext, supporting many Hadoops. * * @param conf Configuration + * @param jobID JobID to use * @return JobContext */ - public static JobContext makeJobContext(Configuration conf) { + public static JobContext makeJobContext(Configuration conf, JobID jobID) { JobContext context; /*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE] - context = new JobContext(conf, new JobID()); + context = new JobContext(conf, jobID); else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ - context = new JobContextImpl(conf, new JobID()); + context = new JobContextImpl(conf, jobID); /*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ return context; } @@ -79,6 +104,28 @@ public class HadoopUtils { /** * Create a JobContext, supporting many Hadoops. * + * @param conf Configuration + * @param jobContext Use JobID from this object + * @return JobContext + */ + public static JobContext makeJobContext(Configuration conf, + JobContext jobContext) { + return makeJobContext(conf, jobContext.getJobID()); + } + + /** + * Create a JobContext, supporting many Hadoops. + * + * @param conf Configuration + * @return JobContext + */ + public static JobContext makeJobContext(Configuration conf) { + return makeJobContext(conf, new JobID()); + } + + /** + * Create a JobContext, supporting many Hadoops. + * * @return JobContext */ public static JobContext makeJobContext() { http://git-wip-us.apache.org/repos/asf/giraph/blob/6c0917ab/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7a79aed..1131a56 100644 --- a/pom.xml +++ b/pom.xml @@ -271,7 +271,7 @@ under the License. <dep.guava.version>12.0</dep.guava.version> <dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version> <dep.hive.version>0.11.0</dep.hive.version> - <dep.hiveio.version>0.14</dep.hiveio.version> + <dep.hiveio.version>0.15</dep.hiveio.version> <dep.json.version>20090211</dep.json.version> <dep.junit.version>4.8</dep.junit.version> <dep.jython.version>2.5.3</dep.jython.version>
