Updated Branches: refs/heads/trunk e28bfd323 -> 909d4c3cc
GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/909d4c3c Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/909d4c3c Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/909d4c3c Branch: refs/heads/trunk Commit: 909d4c3cc09bcc83c199000f632c798b39b24bf7 Parents: e28bfd3 Author: Maja Kabiljo <[email protected]> Authored: Thu Oct 31 13:55:00 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Oct 31 13:55:32 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/hive/HiveGiraphRunner.java | 113 ++++++++++++++++++- .../apache/giraph/hive/common/HiveUtils.java | 49 -------- .../hive/input/edge/HiveEdgeInputFormat.java | 4 +- .../input/vertex/HiveVertexInputFormat.java | 7 +- .../hive/output/HiveVertexOutputFormat.java | 8 +- pom.xml | 2 +- 7 files changed, 118 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 707d971..4465ac1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo) + GIRAPH-787: Use HiveIO 1.9 (gmalewicz via aching) GIRAPH-786: XSparseVector create a lot of objects in add/write http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java index 6b8a8e9..cbcd788 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java @@ -26,6 +26,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Computation; import org.apache.giraph.hive.common.HiveUtils; import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat; @@ -276,7 +277,58 @@ public class HiveGiraphRunner implements Tool { } /** - * Prepare vertex input settings in Configuration + * Create ImmutableClassesGiraphConfiguration from provided Configuration + * which is going to copy all the values set to it to this original + * Configuration + * + * @param conf Configuration to create ImmutableClassesGiraphConfiguration + * from and update with any changes to the returned configuration + * @return ImmutableClassesGiraphConfiguration + */ + private ImmutableClassesGiraphConfiguration createGiraphConf( + final Configuration conf) { + return new ImmutableClassesGiraphConfiguration(conf) { + @Override + public void set(String name, String value) { + super.set(name, value); + conf.set(name, value); + } + }; + } + + /** + * Create ImmutableClassesGiraphConfiguration from provided Configuration + * which is going to copy all the values set to it to provided + * InputFormatDescription + * + * @param conf Configuration to create ImmutableClassesGiraphConfiguration + * from + * @param inputFormatDescription InputFormatDescription to update with any + * changes to the returned configuration + * @return ImmutableClassesGiraphConfiguration + */ + private ImmutableClassesGiraphConfiguration createGiraphConf( + Configuration conf, + final InputFormatDescription inputFormatDescription) { + return new ImmutableClassesGiraphConfiguration(conf) { + @Override + public void set(String name, String value) { + super.set(name, value); + inputFormatDescription.addParameter(name, value); + } + }; + } + + /** + * Prepare vertex input settings in Configuration. + * + * For all Hive vertex inputs, add the user settings to the configuration. + * Additionally, this checks the input specs for every input and caches + * metadata information into the configuration to eliminate worker access to + * the metastore and fail earlier in the case that metadata doesn't exist. + * In the case of multiple vertex input descriptions, metadata is cached in + * each vertex input format description and then saved into a single + * Configuration via JSON. */ @SuppressWarnings("unchecked") public void prepareHiveVertexInputs() { @@ -284,7 +336,27 @@ public class HiveGiraphRunner implements Tool { GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, vertexInputDescriptions.get(0).getInputFormatClass()); vertexInputDescriptions.get(0).putParametersToConfiguration(conf); + // Create VertexInputFormat in order to initialize the Configuration with + // data from metastore, and check it + createGiraphConf(conf).createWrappedVertexInputFormat() + .checkInputSpecs(conf); } else { + // For each of the VertexInputFormats we'll prepare Configuration + // parameters + for (int i = 0; i < vertexInputDescriptions.size(); i++) { + // Create a copy of the Configuration in order not to mess up the + // original one + Configuration confCopy = new Configuration(conf); + final VertexInputFormatDescription vertexInputDescription = + vertexInputDescriptions.get(i); + GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(confCopy, + vertexInputDescription.getInputFormatClass()); + vertexInputDescription.putParametersToConfiguration(confCopy); + // Create VertexInputFormat in order to initialize its description with + // data from metastore, and check it + createGiraphConf(confCopy, vertexInputDescription) + .createWrappedVertexInputFormat().checkInputSpecs(confCopy); + } GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, MultiVertexInputFormat.class); VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf, @@ -293,7 +365,15 @@ public class HiveGiraphRunner implements Tool { } /** - * Prepare edge input settings in Configuration + * Prepare edge input settings in Configuration. + * + * For all Hive edge inputs, add the user settings to the configuration. + * Additionally, this checks the input specs for every input and caches + * metadata information into the configuration to eliminate worker access to + * the metastore and fail earlier in the case that metadata doesn't exist. + * In the case of multiple edge input descriptions, metadata is cached in each + * vertex input format description and then saved into a single + * Configuration via JSON. */ @SuppressWarnings("unchecked") public void prepareHiveEdgeInputs() { @@ -301,7 +381,27 @@ public class HiveGiraphRunner implements Tool { GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, edgeInputDescriptions.get(0).getInputFormatClass()); edgeInputDescriptions.get(0).putParametersToConfiguration(conf); + // Create EdgeInputFormat in order to initialize the Configuration with + // data from metastore, and check it + createGiraphConf(conf).createWrappedEdgeInputFormat() + .checkInputSpecs(conf); } else { + // For each of the EdgeInputFormats we'll prepare Configuration + // parameters + for (int i = 0; i < edgeInputDescriptions.size(); i++) { + // Create a copy of the Configuration in order not to mess up the + // original one + Configuration confCopy = new Configuration(conf); + final EdgeInputFormatDescription edgeInputDescription = + edgeInputDescriptions.get(i); + GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(confCopy, + edgeInputDescription.getInputFormatClass()); + edgeInputDescription.putParametersToConfiguration(confCopy); + // Create EdgeInputFormat in order to initialize its description with + // data from metastore, and check it + createGiraphConf(confCopy, edgeInputDescription) + .createWrappedEdgeInputFormat().checkInputSpecs(confCopy); + } GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, MultiEdgeInputFormat.class); EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf, @@ -310,11 +410,18 @@ public class HiveGiraphRunner implements Tool { } /** - * Prepare output settings in Configuration + * Prepare output settings in Configuration. + * + * This caches metadata information into the configuration to eliminate worker + * access to the metastore. */ public void prepareHiveOutput() { GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf, HiveVertexOutputFormat.class); + // Output format will be checked by Hadoop, here we only create it in + // order to initialize the Configuration with data from metastore. + // Can't check it here since we don't have JobContext yet + createGiraphConf(conf).createWrappedVertexOutputFormat(); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java index b809413..2388673 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java @@ -29,10 +29,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import com.facebook.hiveio.input.HiveApiInputFormat; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.output.HiveApiOutputFormat; -import com.facebook.hiveio.output.HiveOutputDescription; import com.facebook.hiveio.schema.HiveTableSchema; import com.facebook.hiveio.schema.HiveTableSchemas; import com.google.common.base.Splitter; @@ -65,51 +61,6 @@ public class HiveUtils { } /** - * Initialize hive input, prepare Configuration parameters - * - * @param hiveInputFormat HiveApiInputFormat - * @param inputDescription HiveInputDescription - * @param profileId profile ID - * @param conf Configuration - */ - public static void initializeHiveInput(HiveApiInputFormat hiveInputFormat, - HiveInputDescription inputDescription, String profileId, - Configuration conf) { - try { - hiveInputFormat.setMyProfileId(profileId); - HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, profileId); - HiveTableSchema schema = HiveTableSchemas.lookup( - conf, inputDescription.getTableDesc()); - HiveTableSchemas.put(conf, profileId, schema); - } catch (IOException e) { - throw new IllegalStateException( - "initializeHiveInput: IOException occurred", e); - } - } - - /** - * Initialize hive output, prepare Configuration parameters - * - * @param hiveOutputFormat HiveApiOutputFormat - * @param outputDesc HiveOutputDescription - * @param profileId Profile id - * @param conf Configuration - */ - public static void initializeHiveOutput(HiveApiOutputFormat hiveOutputFormat, - HiveOutputDescription outputDesc, String profileId, Configuration conf) { - try { - hiveOutputFormat.setMyProfileId(profileId); - HiveApiOutputFormat.initProfile(conf, outputDesc, profileId); - HiveTableSchema schema = HiveTableSchemas.lookup( - conf, outputDesc.getTableDesc()); - HiveTableSchemas.put(conf, profileId, schema); - } catch (IOException e) { - throw new IllegalStateException( - "initializeHiveOutput: IOException occurred", e); - } - } - - /** * @param outputTablePartitionString table partition string * @return Map */ http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java index 534a773..6ba2aec 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java @@ -40,7 +40,6 @@ import com.facebook.hiveio.schema.HiveTableSchema; import java.io.IOException; import java.util.List; - /** * {@link EdgeInputFormat} for reading edges from Hive. * @@ -71,8 +70,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable, public void setConf( ImmutableClassesGiraphConfiguration<I, Writable, E> conf) { super.setConf(conf); - HiveUtils.initializeHiveInput( - hiveInputFormat, + hiveInputFormat.initialize( GiraphHiveConstants.HIVE_EDGE_INPUT.makeInputDescription(conf), GiraphHiveConstants.HIVE_EDGE_INPUT.getProfileID(conf), conf); http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java index d5c1279..499d839 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java @@ -20,7 +20,6 @@ package org.apache.giraph.hive.input.vertex; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.common.HiveUtils; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; import org.apache.giraph.io.iterables.VertexReaderWrapper; @@ -72,11 +71,9 @@ public class HiveVertexInputFormat<I extends WritableComparable, } @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, V, E> conf) { + public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) { super.setConf(conf); - HiveUtils.initializeHiveInput( - hiveInputFormat, + hiveInputFormat.initialize( GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf), GiraphHiveConstants.HIVE_VERTEX_INPUT.getProfileID(conf), conf); http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java index c4813fb..97f61bd 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java @@ -19,7 +19,6 @@ package org.apache.giraph.hive.output; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.hive.common.HiveUtils; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; import org.apache.hadoop.io.Writable; @@ -84,11 +83,8 @@ public class HiveVertexOutputFormat<I extends WritableComparable, public void setConf( ImmutableClassesGiraphConfiguration<I, V, E> conf) { super.setConf(conf); - HiveUtils.initializeHiveOutput( - hiveOutputFormat, - makeOutputDesc(), - HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf), - conf); + hiveOutputFormat.initialize(makeOutputDesc(), + HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf), conf); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f2981ff..43804f6 100644 --- a/pom.xml +++ b/pom.xml @@ -272,7 +272,7 @@ under the License. <dep.guava.version>12.0</dep.guava.version> <dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version> <dep.hive.version>0.11.0</dep.hive.version> - <dep.hiveio.version>0.19</dep.hiveio.version> + <dep.hiveio.version>0.20</dep.hiveio.version> <dep.json.version>20090211</dep.json.version> <dep.junit.version>4.8</dep.junit.version> <dep.jython.version>2.5.3</dep.jython.version>
