Updated Branches: refs/heads/trunk 974c30b74 -> a802fef1c
GIRAPH-575: update hive-io (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a802fef1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a802fef1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a802fef1 Branch: refs/heads/trunk Commit: a802fef1c25096ced056ffb67a75ffec3d2965a5 Parents: 974c30b Author: Nitay Joffe <[email protected]> Authored: Thu Mar 21 14:17:08 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Thu Mar 21 14:36:53 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../hive/input/edge/HiveEdgeInputFormat.java | 6 +++--- .../input/vertex/AbstractHiveToVertexEdges.java | 14 +++++++++++++- .../input/vertex/AbstractHiveToVertexValue.java | 14 +++++++++++++- .../hive/input/vertex/HiveToVertexEdges.java | 7 ++++--- .../hive/input/vertex/HiveVertexInputFormat.java | 9 ++++----- .../giraph/hive/input/vertex/HiveVertexReader.java | 13 ++++++------- .../giraph/hive/output/HiveVertexWriter.java | 4 +++- pom.xml | 2 +- 9 files changed, 49 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 765dc37..3032873 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-575: update hive-io (nitay) + GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in observers' applicationFailed() method (jgarms via nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java index 3f40763..18b40c2 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java @@ -66,17 +66,17 @@ public class HiveEdgeInputFormat<I extends WritableComparable, throws IOException { Configuration conf = context.getConfiguration(); - RecordReader<WritableComparable, HiveRecord> baseReader; HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); + RecordReader<WritableComparable, HiveRecord> baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); - reader.setHiveRecordReader(baseReader); - reader.initialize(split, context); } catch (InterruptedException e) { throw new IllegalStateException("Could not create edge record reader", e); } + + reader.setHiveRecordReader(baseReader); return reader; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java index 7b01dac..d0668f6 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java @@ -21,6 +21,7 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; /** @@ -34,4 +35,15 @@ import com.facebook.giraph.hive.HiveTableSchemaAware; public abstract class AbstractHiveToVertexEdges<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> - implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { } + implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { + /** Schema stored here */ + private HiveTableSchema tableSchema; + + @Override public void setTableSchema(HiveTableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + @Override public HiveTableSchema getTableSchema() { + return tableSchema; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java index 5c279b5..9ab316f 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java @@ -21,6 +21,7 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; /** @@ -34,4 +35,15 @@ import com.facebook.giraph.hive.HiveTableSchemaAware; public abstract class AbstractHiveToVertexValue<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> - implements HiveTableSchemaAware, HiveToVertexValue<I, V> { } + implements HiveTableSchemaAware, HiveToVertexValue<I, V> { + /** Schema stored here */ + private HiveTableSchema tableSchema; + + @Override public void setTableSchema(HiveTableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + @Override public HiveTableSchema getTableSchema() { + return tableSchema; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java index cf7ea33..8076a8a 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java @@ -51,8 +51,8 @@ public interface HiveToVertexEdges<I extends WritableComparable, /** Singleton */ private static final Empty INSTANCE = new Empty(); - /** Don't construct */ - private Empty() { } + /** Don't construct, allow inheritance */ + protected Empty() { } /** * Get singleton instance @@ -60,7 +60,8 @@ public interface HiveToVertexEdges<I extends WritableComparable, */ public static Empty get() { return INSTANCE; } - @Override public Iterable getEdges(HiveReadableRecord record) { + @Override + public Iterable getEdges(HiveReadableRecord record) { return Collections.emptyList(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java index fb3b123..25c7a26 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java @@ -26,10 +26,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; import com.facebook.giraph.hive.input.HiveApiInputFormat; import java.io.IOException; @@ -68,17 +67,17 @@ public class HiveVertexInputFormat<I extends WritableComparable, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); - RecordReader<WritableComparable, HiveRecord> baseReader; HiveVertexReader reader = new HiveVertexReader(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); + HiveApiRecordReader baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); - reader.setHiveRecordReader(baseReader); - reader.initialize(split, context); } catch (InterruptedException e) { throw new IOException("Could not create vertex reader", e); } + + reader.setHiveRecordReader(baseReader); return reader; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java index 2311e72..541176f 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java @@ -26,13 +26,13 @@ import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.facebook.giraph.hive.HiveRecord; import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; import com.facebook.giraph.hive.HiveTableSchemas; +import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; import java.io.IOException; @@ -57,7 +57,7 @@ public class HiveVertexReader<I extends WritableComparable, public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex"; /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveRecord> hiveRecordReader; + private HiveApiRecordReader hiveRecordReader; /** Schema for table in Hive */ private HiveTableSchema tableSchema; @@ -80,7 +80,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @return RecordReader from Hive. */ - public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() { + public HiveApiRecordReader getHiveRecordReader() { return hiveRecordReader; } @@ -89,8 +89,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @param hiveRecordReader RecordReader to read from Hive. */ - public void setHiveRecordReader( - RecordReader<WritableComparable, HiveRecord> hiveRecordReader) { + public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) { this.hiveRecordReader = hiveRecordReader; } @@ -118,7 +117,7 @@ public class HiveVertexReader<I extends WritableComparable, throws IOException, InterruptedException { hiveRecordReader.initialize(inputSplit, context); conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration()); - instantiateHiveToVertexFromConf(); + instantiateHiveToVertexValueFromConf(); instantiateHiveToVertexEdgesFromConf(); if (conf.getBoolean(REUSE_VERTEX_KEY, false)) { vertexToReuse = conf.createVertex(); @@ -130,7 +129,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @throws IOException if anything goes wrong reading from Configuration. */ - private void instantiateHiveToVertexFromConf() throws IOException { + private void instantiateHiveToVertexValueFromConf() throws IOException { Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY, null, HiveToVertexValue.class); if (klass == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java index 47d096b..a97d40a 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java @@ -35,6 +35,7 @@ import com.facebook.giraph.hive.HiveTableSchemas; import com.facebook.giraph.hive.impl.HiveApiRecord; import java.io.IOException; +import java.util.Collections; /** * Vertex writer using Hive. @@ -124,7 +125,8 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, @Override public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException { - HiveRecord record = new HiveApiRecord(tableSchema.numColumns()); + HiveRecord record = new HiveApiRecord(tableSchema.numColumns(), + Collections.<String>emptyList()); vertexToHive.fillRecord(vertex, record); hiveRecordWriter.write(NullWritable.get(), record); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a802fef1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8d29304..e576e4b 100644 --- a/pom.xml +++ b/pom.xml @@ -881,7 +881,7 @@ under the License. <dependency> <groupId>com.facebook.giraph.hive</groupId> <artifactId>hive-io-experimental</artifactId> - <version>0.2</version> + <version>0.4-SNAPSHOT</version> </dependency> <dependency> <groupId>com.google.guava</groupId>
