GIRAPH-575: Update hive-io dependency (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/799711e1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/799711e1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/799711e1 Branch: refs/heads/partition-values-575 Commit: 799711e1d8ae6ae1bac523d0368f8b80106dee8f Parents: 42b5ec9 Author: Nitay Joffe <[email protected]> Authored: Tue Mar 19 15:54:52 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Tue Mar 19 15:54:52 2013 -0400 ---------------------------------------------------------------------- .../hive/input/edge/HiveEdgeInputFormat.java | 6 +- .../input/vertex/AbstractHiveToVertexEdges.java | 19 ++++++++- .../input/vertex/AbstractHiveToVertexValue.java | 19 ++++++++- .../giraph/hive/input/vertex/HiveToRecord.java | 33 +++++++++++++++ .../hive/input/vertex/HiveToVertexEdges.java | 13 ++++-- .../hive/input/vertex/HiveToVertexValue.java | 2 +- .../hive/input/vertex/HiveVertexInputFormat.java | 9 ++-- .../giraph/hive/input/vertex/HiveVertexReader.java | 15 ++++--- pom.xml | 2 +- 9 files changed, 95 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java index 3f40763..18b40c2 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java @@ -66,17 +66,17 @@ public class HiveEdgeInputFormat<I extends WritableComparable, throws IOException { Configuration conf = context.getConfiguration(); - RecordReader<WritableComparable, HiveRecord> baseReader; HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); + RecordReader<WritableComparable, HiveRecord> baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); - reader.setHiveRecordReader(baseReader); - reader.initialize(split, context); } catch (InterruptedException e) { throw new IllegalStateException("Could not create edge record reader", e); } + + reader.setHiveRecordReader(baseReader); return reader; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java index 7b01dac..cb67749 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexEdges.java @@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; +import java.util.Map; + /** * Base class for HiveToVertexEdges implementations * @@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware; public abstract class AbstractHiveToVertexEdges<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> - implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { } + implements HiveTableSchemaAware, HiveToVertexEdges<I, E> { + /** Schema stored here */ + private HiveTableSchema tableSchema; + + @Override public void setTableSchema(HiveTableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + @Override public HiveTableSchema getTableSchema() { + return tableSchema; + } + + @Override + public void readingPartition(Map<String, String> partitionValues) { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java index 5c279b5..7707cd9 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertexValue.java @@ -21,8 +21,11 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; +import java.util.Map; + /** * Base class for HiveToVertex implementations * @@ -34,4 +37,18 @@ import com.facebook.giraph.hive.HiveTableSchemaAware; public abstract class AbstractHiveToVertexValue<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> - implements HiveTableSchemaAware, HiveToVertexValue<I, V> { } + implements HiveTableSchemaAware, HiveToVertexValue<I, V> { + /** Schema stored here */ + private HiveTableSchema tableSchema; + + @Override public void setTableSchema(HiveTableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + @Override public HiveTableSchema getTableSchema() { + return tableSchema; + } + + @Override + public void readingPartition(Map<String, String> partitionValues) { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java new file mode 100644 index 0000000..afcf4ad --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToRecord.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.hive.input.vertex; + +import java.util.Map; + +/** + * Base interface for HiveTo{X} classes. Holds API common to both. + */ +public interface HiveToRecord { + /** + * Notification that we start reading a split. + * + * @param partitionValues Map of partition data. + */ + void readingPartition(Map<String, String> partitionValues); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java index cf7ea33..0d303d9 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexEdges.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.WritableComparable; import com.facebook.giraph.hive.HiveReadableRecord; import java.util.Collections; +import java.util.Map; /** * Interface for creating edges for a vertex from a Hive record. @@ -35,7 +36,7 @@ import java.util.Collections; * @param <E> extends Writable */ public interface HiveToVertexEdges<I extends WritableComparable, - E extends Writable> { + E extends Writable> extends HiveToRecord { /** * Read Vertex's edges from the HiveRecord given. * @@ -51,8 +52,8 @@ public interface HiveToVertexEdges<I extends WritableComparable, /** Singleton */ private static final Empty INSTANCE = new Empty(); - /** Don't construct */ - private Empty() { } + /** Don't construct, allow inheritance */ + protected Empty() { } /** * Get singleton instance @@ -60,7 +61,11 @@ public interface HiveToVertexEdges<I extends WritableComparable, */ public static Empty get() { return INSTANCE; } - @Override public Iterable getEdges(HiveReadableRecord record) { + @Override + public void readingPartition(Map<String, String> partitionValues) { } + + @Override + public Iterable getEdges(HiveReadableRecord record) { return Collections.emptyList(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java index 593eb9a..382e295 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertexValue.java @@ -30,7 +30,7 @@ import com.facebook.giraph.hive.HiveReadableRecord; * @param <V> Vertex Value */ public interface HiveToVertexValue<I extends WritableComparable, - V extends Writable> { + V extends Writable> extends HiveToRecord { /** * Read the Vertex's ID from the HiveRecord given. * http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java index fb3b123..25c7a26 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java @@ -26,10 +26,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.facebook.giraph.hive.HiveRecord; +import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; import com.facebook.giraph.hive.input.HiveApiInputFormat; import java.io.IOException; @@ -68,17 +67,17 @@ public class HiveVertexInputFormat<I extends WritableComparable, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); - RecordReader<WritableComparable, HiveRecord> baseReader; HiveVertexReader reader = new HiveVertexReader(); reader.setTableSchema(hiveInputFormat.getTableSchema(conf)); + HiveApiRecordReader baseReader; try { baseReader = hiveInputFormat.createRecordReader(split, context); - reader.setHiveRecordReader(baseReader); - reader.initialize(split, context); } catch (InterruptedException e) { throw new IOException("Could not create vertex reader", e); } + + reader.setHiveRecordReader(baseReader); return reader; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java index 2311e72..da6e426 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java @@ -26,13 +26,13 @@ import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.facebook.giraph.hive.HiveRecord; import com.facebook.giraph.hive.HiveTableSchema; import com.facebook.giraph.hive.HiveTableSchemaAware; import com.facebook.giraph.hive.HiveTableSchemas; +import com.facebook.giraph.hive.impl.input.HiveApiRecordReader; import java.io.IOException; @@ -57,7 +57,7 @@ public class HiveVertexReader<I extends WritableComparable, public static final String REUSE_VERTEX_KEY = "giraph.hive.reuse.vertex"; /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveRecord> hiveRecordReader; + private HiveApiRecordReader hiveRecordReader; /** Schema for table in Hive */ private HiveTableSchema tableSchema; @@ -80,7 +80,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @return RecordReader from Hive. */ - public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() { + public HiveApiRecordReader getHiveRecordReader() { return hiveRecordReader; } @@ -89,8 +89,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @param hiveRecordReader RecordReader to read from Hive. */ - public void setHiveRecordReader( - RecordReader<WritableComparable, HiveRecord> hiveRecordReader) { + public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) { this.hiveRecordReader = hiveRecordReader; } @@ -118,11 +117,13 @@ public class HiveVertexReader<I extends WritableComparable, throws IOException, InterruptedException { hiveRecordReader.initialize(inputSplit, context); conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration()); - instantiateHiveToVertexFromConf(); + instantiateHiveToVertexValueFromConf(); instantiateHiveToVertexEdgesFromConf(); if (conf.getBoolean(REUSE_VERTEX_KEY, false)) { vertexToReuse = conf.createVertex(); } + hiveToVertexEdges.readingPartition(hiveRecordReader.getPartitionValues()); + hiveToVertexValue.readingPartition(hiveRecordReader.getPartitionValues()); } /** @@ -130,7 +131,7 @@ public class HiveVertexReader<I extends WritableComparable, * * @throws IOException if anything goes wrong reading from Configuration. */ - private void instantiateHiveToVertexFromConf() throws IOException { + private void instantiateHiveToVertexValueFromConf() throws IOException { Class<? extends HiveToVertexValue> klass = conf.getClass(HIVE_TO_VERTEX_KEY, null, HiveToVertexValue.class); if (klass == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/799711e1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 53c57ca..e576e4b 100644 --- a/pom.xml +++ b/pom.xml @@ -881,7 +881,7 @@ under the License. <dependency> <groupId>com.facebook.giraph.hive</groupId> <artifactId>hive-io-experimental</artifactId> - <version>0.3</version> + <version>0.4-SNAPSHOT</version> </dependency> <dependency> <groupId>com.google.guava</groupId>
