Updated Branches: refs/heads/trunk 2a6c9d563 -> a68f2bada
GIRAPH-593: Update Hive IO performance improvements (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a68f2bad Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a68f2bad Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a68f2bad Branch: refs/heads/trunk Commit: a68f2badaa216125106bcc1e9397edf43623a9ec Parents: 2a6c9d5 Author: Nitay Joffe <[email protected]> Authored: Thu Mar 28 15:37:14 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Thu Mar 28 15:37:14 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/hive/HiveGiraphRunner.java | 39 ++++++++---- .../DefaultConfigurableAndTableSchemaAware.java | 4 +- .../giraph/hive/common/GiraphHiveConstants.java | 47 +++++++++++++++ .../apache/giraph/hive/common/HiveProfiles.java | 3 - .../hive/input/edge/HiveEdgeInputFormat.java | 4 +- .../giraph/hive/input/edge/HiveEdgeReader.java | 26 ++++---- .../apache/giraph/hive/input/edge/HiveToEdge.java | 4 +- .../giraph/hive/input/edge/SimpleHiveToEdge.java | 17 ++---- .../giraph/hive/input/vertex/HiveToVertex.java | 4 +- .../hive/input/vertex/HiveVertexInputFormat.java | 5 +- .../giraph/hive/input/vertex/HiveVertexReader.java | 32 +++++----- .../hive/input/vertex/SimpleHiveToVertex.java | 9 +-- .../input/vertex/SimpleNoEdgesHiveToVertex.java | 2 +- .../apache/giraph/hive/output/HiveRecordSaver.java | 4 +- .../giraph/hive/output/HiveVertexOutputFormat.java | 4 +- .../giraph/hive/output/HiveVertexWriter.java | 24 ++++---- .../giraph/hive/output/SimpleVertexToHive.java | 4 +- .../apache/giraph/hive/output/VertexToHive.java | 2 +- pom.xml | 2 +- 20 files changed, 146 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index ab59833..4e2036f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-593: Update Hive IO performance improvements (nitay) + GIRAPH-594: auto set reusing objects (nitay) GIRAPH-597: Don't reuse vertex by default in SimpleHiveToVertex (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java index 63e9f95..712134e 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java @@ -27,13 +27,10 @@ import org.apache.commons.cli.ParseException; import org.apache.giraph.conf.GiraphClasses; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.hive.common.HiveProfiles; import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat; -import org.apache.giraph.hive.input.edge.HiveEdgeReader; import org.apache.giraph.hive.input.edge.HiveToEdge; import org.apache.giraph.hive.input.vertex.HiveToVertex; import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat; -import org.apache.giraph.hive.input.vertex.HiveVertexReader; import org.apache.giraph.hive.output.HiveVertexOutputFormat; import org.apache.giraph.hive.output.HiveVertexWriter; import org.apache.giraph.hive.output.VertexToHive; @@ -49,6 +46,7 @@ import com.facebook.giraph.hive.input.HiveApiInputFormat; import com.facebook.giraph.hive.input.HiveInputDescription; import com.facebook.giraph.hive.output.HiveApiOutputFormat; import com.facebook.giraph.hive.output.HiveOutputDescription; +import com.facebook.giraph.hive.schema.HiveTableSchemas; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -59,6 +57,14 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_SPLITS; +import static org.apache.giraph.hive.common.HiveProfiles.EDGE_INPUT_PROFILE_ID; +import static org.apache.giraph.hive.common.HiveProfiles.VERTEX_INPUT_PROFILE_ID; +import static org.apache.giraph.hive.common.HiveProfiles.VERTEX_OUTPUT_PROFILE_ID; + /** * Hive Giraph Runner */ @@ -136,8 +142,7 @@ public class HiveGiraphRunner implements Tool { public void setHiveToVertexClass( Class<? extends HiveToVertex> hiveToVertexClass) { this.hiveToVertexClass = hiveToVertexClass; - conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass, - HiveToVertex.class); + HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass); } /** @@ -169,8 +174,7 @@ public class HiveGiraphRunner implements Tool { */ public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) { this.hiveToEdgeClass = hiveToEdgeClass; - conf.setClass(HiveEdgeReader.HIVE_TO_EDGE_KEY, hiveToEdgeClass, - HiveToEdge.class); + HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass); } public Class<? extends VertexToHive> getVertexToHiveClass() { @@ -247,15 +251,21 @@ public class HiveGiraphRunner implements Tool { */ private void setupHiveInputs(GiraphConfiguration conf) throws TException { if (hiveToVertexClass != null) { - HiveApiInputFormat.initProfile(conf, hiveVertexInputDescription, - HiveProfiles.VERTEX_INPUT_PROFILE_ID); + hiveVertexInputDescription.setNumSplits(HIVE_VERTEX_SPLITS.get(conf)); + HiveApiInputFormat.setProfileInputDesc(conf, hiveVertexInputDescription, + VERTEX_INPUT_PROFILE_ID); conf.setVertexInputFormatClass(HiveVertexInputFormat.class); + HiveTableSchemas.put(conf, VERTEX_INPUT_PROFILE_ID, + hiveVertexInputDescription.hiveTableName()); } if (hiveToEdgeClass != null) { - HiveApiInputFormat.initProfile(conf, hiveEdgeInputDescription, - HiveProfiles.EDGE_INPUT_PROFILE_ID); + hiveEdgeInputDescription.setNumSplits(HIVE_EDGE_SPLITS.get(conf)); + HiveApiInputFormat.setProfileInputDesc(conf, hiveEdgeInputDescription, + EDGE_INPUT_PROFILE_ID); conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class); + HiveTableSchemas.put(conf, EDGE_INPUT_PROFILE_ID, + hiveEdgeInputDescription.hiveTableName()); } } @@ -270,8 +280,10 @@ public class HiveGiraphRunner implements Tool { LOG.warn("run: Warning - Output will be skipped!"); } else if (vertexToHiveClass != null) { HiveApiOutputFormat.initProfile(conf, hiveOutputDescription, - HiveProfiles.VERTEX_OUTPUT_PROFILE_ID); + VERTEX_OUTPUT_PROFILE_ID); conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class); + HiveTableSchemas.put(conf, VERTEX_OUTPUT_PROFILE_ID, + hiveOutputDescription.hiveTableName()); } else { LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() + " not set"); @@ -336,8 +348,7 @@ public class HiveGiraphRunner implements Tool { String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass"); if (hiveToVertexClassStr != null) { - setHiveToVertexClass(findClass(hiveToVertexClassStr, - HiveToVertex.class)); + setHiveToVertexClass(findClass(hiveToVertexClassStr, HiveToVertex.class)); } String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass"); http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java index c8b201f..ae32b71 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java @@ -22,8 +22,8 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveTableSchema; -import com.facebook.giraph.hive.HiveTableSchemaAware; +import com.facebook.giraph.hive.schema.HiveTableSchema; +import com.facebook.giraph.hive.schema.HiveTableSchemaAware; /** * Default implementation of {@link HiveTableSchemaAware} and http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java new file mode 100644 index 0000000..f8363b1 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.hive.common; + +import org.apache.giraph.conf.ClassConfOption; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.hive.input.edge.HiveToEdge; +import org.apache.giraph.hive.input.vertex.HiveToVertex; + +/** + * Constants for giraph-hive + */ +public class GiraphHiveConstants { + /** Number of edge splits */ + public static final IntConfOption HIVE_EDGE_SPLITS = + new IntConfOption("giraph.hive.input.edge.splits", 0); + /** Number of vertex splits */ + public static final IntConfOption HIVE_VERTEX_SPLITS = + new IntConfOption("giraph.hive.input.vertex.splits", 0); + /** Class for converting hive records to edges */ + public static final ClassConfOption<HiveToEdge> HIVE_TO_EDGE_CLASS = + ClassConfOption.create("giraph.hive.to.edge.class", null, + HiveToEdge.class); + /** Class for converting hive records to vertices */ + public static final ClassConfOption<HiveToVertex> HIVE_TO_VERTEX_CLASS = + ClassConfOption.create("giraph.hive.to.vertex.class", null, + HiveToVertex.class); + + /** Don't construct */ + protected GiraphHiveConstants() { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java index b0ddc48..892d443 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java @@ -26,9 +26,6 @@ public class HiveProfiles { public static final String VERTEX_INPUT_PROFILE_ID = "vertex_input_profile"; /** name of edge input profile */ public static final String EDGE_INPUT_PROFILE_ID = "edge_input_profile"; - /** name of vertex value input profile */ - public static final String VERTEX_VALUE_INPUT_PROFILE_ID = - "vertex_value_input_profile"; /** Name of vertex output profile */ public static final String VERTEX_OUTPUT_PROFILE_ID = "vertex_output_profile"; http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java index 68edbfc..041e331 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java @@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; import com.facebook.giraph.hive.input.HiveApiInputFormat; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.io.IOException; import java.util.List; @@ -70,7 +70,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable, HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); - RecordReader<WritableComparable, HiveRecord> baseReader; + RecordReader<WritableComparable, HiveReadableRecord> baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java index 275f8f7..e1a69cf 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java @@ -19,9 +19,9 @@ package org.apache.giraph.hive.input.edge; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.hive.input.RecordReaderWrapper; import org.apache.giraph.io.iterables.EdgeWithSource; import org.apache.giraph.io.iterables.GiraphReader; -import org.apache.giraph.hive.input.RecordReaderWrapper; import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -29,13 +29,15 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; -import com.facebook.giraph.hive.HiveTableSchema; -import com.facebook.giraph.hive.HiveTableSchemaAware; -import com.facebook.giraph.hive.HiveTableSchemas; +import com.facebook.giraph.hive.record.HiveReadableRecord; +import com.facebook.giraph.hive.schema.HiveTableSchema; +import com.facebook.giraph.hive.schema.HiveTableSchemaAware; +import com.facebook.giraph.hive.schema.HiveTableSchemas; import java.io.IOException; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS; + /** * A reader for reading edges from Hive. * @@ -48,7 +50,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable> public static final String HIVE_TO_EDGE_KEY = "giraph.hive.to.edge.class"; /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveRecord> hiveRecordReader; + private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader; /** Schema for table in Hive */ private HiveTableSchema tableSchema; @@ -63,7 +65,8 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable> * * @return RecordReader from Hive */ - public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() { + public RecordReader<WritableComparable, HiveReadableRecord> + getHiveRecordReader() { return hiveRecordReader; } @@ -73,7 +76,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable> * @param hiveRecordReader RecordReader to read from Hive. */ public void setHiveRecordReader( - RecordReader<WritableComparable, HiveRecord> hiveRecordReader) { + RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) { this.hiveRecordReader = hiveRecordReader; } @@ -103,7 +106,7 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable> conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration()); instantiateHiveToEdgeFromConf(); hiveToEdge.initializeRecords( - new RecordReaderWrapper<HiveRecord>(hiveRecordReader)); + new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader)); } /** @@ -112,10 +115,9 @@ public class HiveEdgeReader<I extends WritableComparable, E extends Writable> * @throws IOException if anything goes wrong reading from Configuration */ private void instantiateHiveToEdgeFromConf() throws IOException { - Class<? extends HiveToEdge> klass = conf.getClass(HIVE_TO_EDGE_KEY, - null, HiveToEdge.class); + Class<? extends HiveToEdge> klass = HIVE_TO_EDGE_CLASS.get(conf); if (klass == null) { - throw new IOException(HIVE_TO_EDGE_KEY + " not set in conf"); + throw new IOException(HIVE_TO_EDGE_CLASS.getKey() + " not set in conf"); } hiveToEdge = ReflectionUtils.newInstance(klass, conf); HiveTableSchemas.configure(hiveToEdge, tableSchema); http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java index 8b22a8f..515f1da 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java @@ -22,7 +22,7 @@ import org.apache.giraph.io.iterables.EdgeWithSource; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.util.Iterator; @@ -43,5 +43,5 @@ public interface HiveToEdge<I extends WritableComparable, * * @param records Hive records */ - void initializeRecords(Iterator<HiveRecord> records); + void initializeRecords(Iterator<HiveReadableRecord> records); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java index 7aa8721..94a811e 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java @@ -18,13 +18,11 @@ package org.apache.giraph.hive.input.edge; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.iterables.EdgeWithSource; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveReadableRecord; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.util.Iterator; @@ -41,7 +39,7 @@ public abstract class SimpleHiveToEdge<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends AbstractHiveToEdge<I, V, E, M> { /** Iterator over input records */ - private Iterator<HiveRecord> records; + private Iterator<HiveReadableRecord> records; /** Reusable {@link EdgeWithSource} object */ private EdgeWithSource<I, E> reusableEdge = new EdgeWithSource<I, E>(); @@ -70,14 +68,9 @@ public abstract class SimpleHiveToEdge<I extends WritableComparable, public abstract E getEdgeValue(HiveReadableRecord hiveRecord); @Override - public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) { - super.setConf(conf); - reusableEdge.setEdge(getConf().createReusableEdge()); - } - - @Override - public final void initializeRecords(Iterator<HiveRecord> records) { + public final void initializeRecords(Iterator<HiveReadableRecord> records) { this.records = records; + reusableEdge.setEdge(getConf().createReusableEdge()); } @Override @@ -87,7 +80,7 @@ public abstract class SimpleHiveToEdge<I extends WritableComparable, @Override public EdgeWithSource<I, E> next() { - HiveRecord record = records.next(); + HiveReadableRecord record = records.next(); reusableEdge.setSourceVertexId(getSourceVertexId(record)); reusableEdge.setTargetVertexId(getTargetVertexId(record)); reusableEdge.setEdgeValue(getEdgeValue(record)); http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java index 1179961..f57dc4e 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java @@ -22,7 +22,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.util.Iterator; @@ -46,5 +46,5 @@ public interface HiveToVertex<I extends WritableComparable, * * @param records Hive records */ - void initializeRecords(Iterator<HiveRecord> records); + void initializeRecords(Iterator<HiveReadableRecord> records); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java index 4f70750..3b25444 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java @@ -27,10 +27,11 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; import com.facebook.giraph.hive.input.HiveApiInputFormat; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.io.IOException; import java.util.List; @@ -71,7 +72,7 @@ public class HiveVertexInputFormat<I extends WritableComparable, HiveVertexReader<I, V, E, M> reader = new HiveVertexReader<I, V, E, M>(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); - HiveApiRecordReader baseReader; + RecordReader<WritableComparable, HiveReadableRecord> baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java index 442c796..9c172be 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java @@ -26,16 +26,18 @@ import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; -import com.facebook.giraph.hive.HiveTableSchema; -import com.facebook.giraph.hive.HiveTableSchemaAware; -import com.facebook.giraph.hive.HiveTableSchemas; -import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; +import com.facebook.giraph.hive.record.HiveReadableRecord; +import com.facebook.giraph.hive.schema.HiveTableSchema; +import com.facebook.giraph.hive.schema.HiveTableSchemaAware; +import com.facebook.giraph.hive.schema.HiveTableSchemas; import java.io.IOException; +import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS; + /** * VertexReader using Hive * @@ -47,11 +49,8 @@ import java.io.IOException; public class HiveVertexReader<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements GiraphReader<Vertex<I, V, E, M>>, HiveTableSchemaAware { - /** Configuration key for {@link HiveToVertex} class */ - public static final String HIVE_TO_VERTEX_KEY = - "giraph.hive.to.vertex.class"; /** Underlying Hive RecordReader used */ - private HiveApiRecordReader hiveRecordReader; + private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader; /** Schema for table in Hive */ private HiveTableSchema tableSchema; @@ -69,7 +68,8 @@ public class HiveVertexReader<I extends WritableComparable, * * @return RecordReader from Hive. */ - public HiveApiRecordReader getHiveRecordReader() { + public RecordReader<WritableComparable, HiveReadableRecord> + getHiveRecordReader() { return hiveRecordReader; } @@ -78,7 +78,8 @@ public class HiveVertexReader<I extends WritableComparable, * * @param hiveRecordReader RecordReader to read from Hive. */ - public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) { + public void setHiveRecordReader( + RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) { this.hiveRecordReader = hiveRecordReader; } @@ -105,14 +106,13 @@ public class HiveVertexReader<I extends WritableComparable, public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { hiveRecordReader.initialize(inputSplit, context); - conf = new ImmutableClassesGiraphConfiguration<I, V, E, - M>(context.getConfiguration()); - Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY, - SimpleHiveToVertex.class, HiveToVertex.class); + conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>( + context.getConfiguration()); + Class<? extends HiveToVertex> klass = HIVE_TO_VERTEX_CLASS.get(conf); hiveToVertex = ReflectionUtils.newInstance(klass, conf); HiveTableSchemas.configure(hiveToVertex, tableSchema); hiveToVertex.initializeRecords( - new RecordReaderWrapper<HiveRecord>(hiveRecordReader)); + new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader)); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java index a4acd2f..59c10be 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java @@ -24,8 +24,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveReadableRecord; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveReadableRecord; import java.util.Iterator; @@ -42,7 +41,7 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends AbstractHiveToVertex<I, V, E, M> { /** Hive records which we are reading from */ - private Iterator<HiveRecord> records; + private Iterator<HiveReadableRecord> records; /** Reusable vertex object */ private Vertex<I, V, E, M> reusableVertex = null; @@ -80,7 +79,7 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, } @Override - public void initializeRecords(Iterator<HiveRecord> records) { + public void initializeRecords(Iterator<HiveReadableRecord> records) { this.records = records; } @@ -91,7 +90,7 @@ public abstract class SimpleHiveToVertex<I extends WritableComparable, @Override public Vertex<I, V, E, M> next() { - HiveRecord record = records.next(); + HiveReadableRecord record = records.next(); I id = getVertexId(record); V value = getVertexValue(record); Iterable<Edge<I, E>> edges = getEdges(record); http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java index 195a69e..ca427a0 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java @@ -22,7 +22,7 @@ import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveReadableRecord; +import com.facebook.giraph.hive.record.HiveReadableRecord; import com.google.common.collect.ImmutableList; /** http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java index 70de517..a0a4207 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java @@ -18,7 +18,7 @@ package org.apache.giraph.hive.output; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveWritableRecord; import java.io.IOException; @@ -33,5 +33,5 @@ public interface HiveRecordSaver { * @throws IOException * @throws InterruptedException */ - void save(HiveRecord record) throws IOException, InterruptedException; + void save(HiveWritableRecord record) throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java index 641a298..9ff74d6 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java @@ -29,8 +29,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; import com.facebook.giraph.hive.output.HiveApiOutputFormat; +import com.facebook.giraph.hive.record.HiveWritableRecord; import java.io.IOException; @@ -60,7 +60,7 @@ public class HiveVertexOutputFormat<I extends WritableComparable, throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - RecordWriter<WritableComparable, HiveRecord> baseWriter = + RecordWriter<WritableComparable, HiveWritableRecord> baseWriter = hiveOutputFormat.getRecordWriter(context); HiveVertexWriter writer = new HiveVertexWriter(); writer.setBaseWriter(baseWriter); http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java index 3eef1f4..45d0226 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java @@ -29,13 +29,14 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; -import com.facebook.giraph.hive.HiveRecord; -import com.facebook.giraph.hive.HiveTableSchema; -import com.facebook.giraph.hive.HiveTableSchemas; -import com.facebook.giraph.hive.impl.HiveApiRecord; +import com.facebook.giraph.hive.input.parser.hive.DefaultRecord; +import com.facebook.giraph.hive.record.HiveRecord; +import com.facebook.giraph.hive.record.HiveWritableRecord; +import com.facebook.giraph.hive.schema.HiveTableSchema; +import com.facebook.giraph.hive.schema.HiveTableSchemas; import java.io.IOException; -import java.util.Collections; + /** * Vertex writer using Hive. @@ -53,7 +54,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class); /** Underlying Hive RecordWriter used */ - private RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter; + private RecordWriter<WritableComparable, HiveWritableRecord> hiveRecordWriter; /** Schema for table in Hive */ private HiveTableSchema tableSchema; @@ -71,7 +72,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, * * @return RecordWriter for Hive. */ - public RecordWriter<WritableComparable, HiveRecord> getBaseWriter() { + public RecordWriter<WritableComparable, HiveWritableRecord> getBaseWriter() { return hiveRecordWriter; } @@ -81,7 +82,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, * @param hiveRecordWriter RecordWriter to write to Hive. */ public void setBaseWriter( - RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter) { + RecordWriter<WritableComparable, HiveWritableRecord> hiveRecordWriter) { this.hiveRecordWriter = hiveRecordWriter; } @@ -101,8 +102,8 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, */ public void setTableSchema(HiveTableSchema tableSchema) { this.tableSchema = tableSchema; - reusableRecord = new HiveApiRecord(tableSchema.numColumns(), - Collections.<String>emptyList()); + reusableRecord = new DefaultRecord(tableSchema.numColumns(), + new String[0]); } @Override @@ -140,7 +141,8 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, } @Override - public void save(HiveRecord record) throws IOException, InterruptedException { + public void save(HiveWritableRecord record) throws IOException, + InterruptedException { hiveRecordWriter.write(NullWritable.get(), record); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java index 3e76d87..61de791 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java @@ -22,8 +22,8 @@ import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveRecord; -import com.facebook.giraph.hive.HiveWritableRecord; +import com.facebook.giraph.hive.record.HiveRecord; +import com.facebook.giraph.hive.record.HiveWritableRecord; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java index ff5869d..05e81ab 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java @@ -22,7 +22,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.record.HiveRecord; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 799bb0a..1e321b8 100644 --- a/pom.xml +++ b/pom.xml @@ -881,7 +881,7 @@ under the License. <dependency> <groupId>com.facebook.giraph.hive</groupId> <artifactId>hive-io-experimental</artifactId> - <version>0.4</version> + <version>0.5</version> </dependency> <dependency> <groupId>com.google.guava</groupId>
