Updated Branches: refs/heads/trunk 49dbdf72e -> c2af20485
http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java new file mode 100644 index 0000000..ac62827 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.input.edge; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.hive.types.HiveValueReader; +import org.apache.giraph.hive.types.HiveVertexIdReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import java.util.Iterator; + +import static org.apache.giraph.hive.types.TypedValueReader.createValueReader; +import static org.apache.giraph.hive.types.TypedVertexIdReader.createIdReader; + +/** + * A {@link HiveToEdge} using {@link org.apache.giraph.types.WritableWrapper}s + * + * @param <I> Vertex ID + * @param <E> Edge Value + */ +public class TypedHiveToEdge<I extends WritableComparable, E extends Writable> + extends SimpleHiveToEdge<I, E> { + /** Source ID column name in Hive */ + public static final StrConfOption EDGE_SOURCE_ID_COLUMN = + new StrConfOption("hive.input.edge.source.id.column", null, + "Source Vertex ID column"); + /** Target ID column name in Hive */ + public static final StrConfOption EDGE_TARGET_ID_COLUMN = + new StrConfOption("hive.input.edge.target.id.column", null, + "Target Vertex ID column"); + /** Edge Value column name in Hive */ + public static final StrConfOption EDGE_VALUE_COLUMN = + new StrConfOption("hive.input.edge.value.column", null, + "Edge Value column"); + + /** Source ID reader */ + private HiveVertexIdReader<I> sourceIdReader; + /** Target ID reader */ + private HiveVertexIdReader<I> targetIdReader; + /** Edge Value reader */ + private HiveValueReader<E> vertexValueReader; + + @Override + public void checkInput(HiveInputDescription inputDesc, + HiveTableSchema schema) { } + + @Override + public void initializeRecords(Iterator<HiveReadableRecord> records) { + super.initializeRecords(records); + + HiveTableSchema schema = getTableSchema(); + ImmutableClassesGiraphConfiguration conf = getConf(); + + sourceIdReader = createIdReader(conf, EDGE_SOURCE_ID_COLUMN, schema); + targetIdReader = createIdReader(conf, EDGE_TARGET_ID_COLUMN, schema); + vertexValueReader = createValueReader(conf, EDGE_VALUE_COLUMN, schema); + } + + @Override + public E getEdgeValue(HiveReadableRecord record) { + return vertexValueReader.readValue(record); + } + + @Override + public I getSourceVertexId( + HiveReadableRecord record) { + return sourceIdReader.readId(record); + } + + @Override + public I getTargetVertexId( + HiveReadableRecord record) { + return targetIdReader.readId(record); + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java new file mode 100644 index 0000000..5ca7d34 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.input.vertex; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.hive.types.HiveValueReader; +import org.apache.giraph.hive.types.HiveVertexIdReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; +import com.google.common.collect.ImmutableList; + +import java.util.Iterator; + +import static org.apache.giraph.hive.types.TypedValueReader.createValueReader; +import static org.apache.giraph.hive.types.TypedVertexIdReader.createIdReader; + +/** + * A {@link HiveToVertex} using {@link org.apache.giraph.types.WritableWrapper} + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + */ +public class TypedHiveToVertex<I extends WritableComparable, V extends Writable, + E extends Writable> extends SimpleHiveToVertex<I, V, E> { + /** Source ID column name in Hive */ + public static final StrConfOption VERTEX_ID_COLUMN = + new StrConfOption("hive.input.vertex.id.column", null, + "Vertex ID column"); + /** Target ID column name in Hive */ + public static final StrConfOption VERTEX_VALUE_COLUMN = + new StrConfOption("hive.input.vertex.value.column", null, + "Vertex Value column"); + /** Edge Value column name in Hive */ + public static final StrConfOption EDGES_COLUMN = + new StrConfOption("hive.input.vertex.edges.column", null, + "Edges column"); + + /** Vertex ID reader */ + private HiveVertexIdReader<I> vertexIdReader; + /** Vertex Value reader */ + private HiveValueReader<V> vertexValueReader; + + @Override + public void checkInput(HiveInputDescription inputDesc, + HiveTableSchema schema) { } + + @Override + public void initializeRecords(Iterator<HiveReadableRecord> records) { + super.initializeRecords(records); + + HiveTableSchema schema = getTableSchema(); + ImmutableClassesGiraphConfiguration conf = getConf(); + + vertexIdReader = createIdReader(conf, VERTEX_ID_COLUMN, schema); + vertexValueReader = createValueReader(conf, VERTEX_VALUE_COLUMN, schema); + } + + @Override + public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) { + return ImmutableList.of(); + } + + @Override + public I getVertexId(HiveReadableRecord record) { + return vertexIdReader.readId(record); + } + + @Override + public V getVertexValue(HiveReadableRecord record) { + return vertexValueReader.readValue(record); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java index 477ce6e..4046bb2 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java @@ -31,4 +31,7 @@ import org.apache.hadoop.io.WritableComparable; public abstract class AbstractVertexToHive<I extends WritableComparable, V extends Writable, E extends Writable> extends DefaultConfigurableAndTableSchemaAware<I, V, E> - implements VertexToHive<I, V, E> { } + implements VertexToHive<I, V, E> { + @Override + public void initialize() { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java index bb27f25..796bc8f 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java @@ -97,6 +97,7 @@ public class HiveVertexWriter<I extends WritableComparable, V extends Writable, public void initialize(TaskAttemptContext context) throws IOException, InterruptedException { vertexToHive = HiveUtils.newVertexToHive(getConf(), tableSchema); + vertexToHive.initialize(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java new file mode 100644 index 0000000..8d6fd9a --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.output; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.hive.types.HiveValueWriter; +import org.apache.giraph.hive.types.HiveVertexIdWriter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.output.HiveOutputDescription; +import com.facebook.hiveio.record.HiveWritableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import static org.apache.giraph.hive.types.TypedValueWriter.createValueWriter; +import static org.apache.giraph.hive.types.TypedVertexIdWriter.createIdWriter; + +/** + * A {@link VertexToHive} using + * {@link org.apache.giraph.types.WritableUnwrapper}s. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + */ +public class TypedVertexToHive<I extends WritableComparable, V extends Writable, + E extends Writable> extends SimpleVertexToHive<I, V, E> { + /** Source ID column name in Hive */ + public static final StrConfOption VERTEX_ID_COLUMN = + new StrConfOption("hive.output.vertex.id.column", null, + "Source Vertex ID column"); + /** Target ID column name in Hive */ + public static final StrConfOption VERTEX_VALUE_COLUMN = + new StrConfOption("hive.output.vertex.value.column", null, + "Target Vertex ID column"); + + /** Vertex ID writer */ + private HiveVertexIdWriter<I> vertexIdWriter; + /** Vertex Value writer */ + private HiveValueWriter<V> vertexValueWriter; + + @Override + public void checkOutput(HiveOutputDescription outputDesc, + HiveTableSchema schema, HiveWritableRecord emptyRecord) { } + + @Override + public void initialize() { + HiveTableSchema schema = getTableSchema(); + ImmutableClassesGiraphConfiguration conf = getConf(); + + vertexIdWriter = createIdWriter(conf, VERTEX_ID_COLUMN, schema); + vertexValueWriter = createValueWriter(conf, VERTEX_VALUE_COLUMN, schema); + } + + @Override + public void fillRecord(Vertex<I, V, E> vertex, HiveWritableRecord record) { + vertexIdWriter.writeId(vertex.getId(), record); + vertexValueWriter.writeValue(vertex.getValue(), record); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java index f9537a7..bf13ec1 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java @@ -38,6 +38,12 @@ import java.io.IOException; public interface VertexToHive<I extends WritableComparable, V extends Writable, E extends Writable> { /** + * User initialization before any saveVertex() calls but after Configuration + * and HiveTableSchema are guaranteed to be set. + */ + void initialize(); + + /** * Check the output is valid. This method provides information to the user as * early as possible so that they may validate they are using the correct * input and fail the job early rather than getting into it and waiting a long http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java new file mode 100644 index 0000000..c0fad12 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveReadableRecord; + +/** + * Interface for reading Vertex / Edge values from Hive records + * + * @param <T> Value type + */ +public interface HiveValueReader<T extends Writable> { + /** + * Read value from record + * + * @param record Hive record + * @return value + */ + T readValue(HiveReadableRecord record); + + /** + * Null implementation that return NullWritable + * + * @param <W> Writable type + */ + public class Null<W extends Writable> implements HiveValueReader<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public W readValue(HiveReadableRecord record) { + return (W) NullWritable.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java new file mode 100644 index 0000000..f38f79a --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveWritableRecord; + +/** + * Interface for writing Vertex / Edge values from Hive records + * + * @param <T> Value type + */ +public interface HiveValueWriter<T extends Writable> { + /** + * Write value to record + * + * @param value the value + * @param record Hive record + */ + void writeValue(T value, HiveWritableRecord record); + + /** + * Null implementation that does nothing + * + * @param <W> Writable type + */ + public static class Null<W extends Writable> implements HiveValueWriter<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public void writeValue(W value, HiveWritableRecord record) { } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java new file mode 100644 index 0000000..1ad6f7f --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.record.HiveReadableRecord; + +/** + * Interface for reading Vertex IDs from Hive records + * + * @param <I> Vertex ID + */ +public interface HiveVertexIdReader<I extends WritableComparable> { + /** + * Read Vertex ID from Hive record + * + * @param record Hive record + * @return Vertex ID + */ + I readId(HiveReadableRecord record); + + /** + * Null implementation that return NullWritable + * + * @param <W> Writable type + */ + public static class Null<W extends WritableComparable> + implements HiveVertexIdReader<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public W readId(HiveReadableRecord record) { + return (W) NullWritable.get(); + } + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java new file mode 100644 index 0000000..33196f5 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.record.HiveWritableRecord; + +/** + * Interface for writing Vertex IDs from Hive records + * + * @param <I> Vertex ID + */ +public interface HiveVertexIdWriter<I extends WritableComparable> { + /** + * Write Vertex ID to record + * + * @param id Vertex ID + * @param record Hive record + */ + void writeId(I id, HiveWritableRecord record); + + /** + * Null implementation that does nothing + * + * @param <W> Writable type + */ + public static class Null<W extends WritableComparable> + implements HiveVertexIdWriter<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public void writeId(W id, HiveWritableRecord record) { } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java new file mode 100644 index 0000000..e9b0687 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.types.WritableWrapper; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; +import static org.apache.giraph.types.WritableWrappers.lookup; + +/** + * Reader for Vertex/Edge Values from Hive with known types + * + * @param <W> Vertex/Edge Value + */ +public class TypedValueReader<W extends Writable> + implements HiveValueReader<W> { + /** Hive column index */ + private final int columnIndex; + /** {@link WritableWrapper} for Hive column to Giraph Writable */ + private final WritableWrapper<W, Object> writableWrapper; + + /** + * Constructor + * + * @param columnIndex column index + * @param writableWrapper {@link WritableWrapper} + */ + public TypedValueReader(int columnIndex, + WritableWrapper<W, Object> writableWrapper) { + this.columnIndex = columnIndex; + this.writableWrapper = writableWrapper; + } + + /** + * Create from Configuration with column name and Schema + * + * @param conf Configuration + * @param columnOption StrConfOption for column name + * @param schema HiveTableSchema + * @param <V> Vertex/Edge Value + * @return TypedVertexValueReader + */ + public static <V extends Writable> HiveValueReader<V> + createValueReader( + ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, + HiveTableSchema schema) { + Class<V> vertexValueClass = conf.getVertexValueClass(); + if (NullWritable.class.isAssignableFrom(vertexValueClass)) { + return HiveValueReader.Null.get(); + } + int columnIndex = columnIndexOrThrow(schema, conf, columnOption); + Class hiveClass = schema.columnType(columnIndex).javaClass(); + WritableWrapper wrapper = lookup(vertexValueClass, hiveClass); + return new TypedValueReader(columnIndex, wrapper); + } + + @Override + public W readValue(HiveReadableRecord record) { + Object object = record.get(columnIndex); + return writableWrapper.wrap(object); + } + + public int getColumnIndex() { + return columnIndex; + } + + public WritableWrapper<W, Object> getWritableWrapper() { + return writableWrapper; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java new file mode 100644 index 0000000..8841bda --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.types.WritableUnwrapper; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveWritableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; +import static org.apache.giraph.types.WritableUnwrappers.lookup; + +/** + * Writer for Vertex/Edge Values from Hive with known types + * + * @param <W> Vertex/Edge Value + */ +public class TypedValueWriter<W extends Writable> + implements HiveValueWriter<W> { + /** Hive column index */ + private final int columnIndex; + /** {@link WritableUnwrapper} for Hive column to Giraph Writable */ + private final WritableUnwrapper<W, Object> writableUnwrapper; + + /** + * Constructor + * + * @param columnIndex column index + * @param writableUnwrapper JavaWritableConverter + */ + public TypedValueWriter(int columnIndex, + WritableUnwrapper<W, Object> writableUnwrapper) { + this.columnIndex = columnIndex; + this.writableUnwrapper = writableUnwrapper; + } + + /** + * Create from Configuration with column name and Schema + * + * @param conf Configuration + * @param columnOption StrConfOption for column name + * @param schema HiveTableSchema + * @param <V> Vertex/Edge Value + * @return TypedVertexValueReader + */ + public static <V extends Writable> HiveValueWriter<V> + createValueWriter( + ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, + HiveTableSchema schema) { + Class<V> vertexValueClass = conf.getVertexValueClass(); + if (NullWritable.class.isAssignableFrom(vertexValueClass)) { + return HiveValueWriter.Null.get(); + } + int columnIndex = columnIndexOrThrow(schema, conf, columnOption); + Class hiveClass = schema.columnType(columnIndex).javaClass(); + WritableUnwrapper unwrapper = lookup(vertexValueClass, hiveClass); + return new TypedValueWriter(columnIndex, unwrapper); + } + + @Override + public void writeValue(W value, HiveWritableRecord record) { + Object object = writableUnwrapper.unwrap(value); + record.set(columnIndex, object); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java new file mode 100644 index 0000000..6d80d37 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.types.WritableWrapper; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; +import static org.apache.giraph.types.WritableWrappers.lookup; + +/** + * Reader for Vertex IDs from Hive with known types + * + * @param <I> Vertex ID + */ +public class TypedVertexIdReader<I extends WritableComparable> + implements HiveVertexIdReader<I> { + /** Hive column index */ + private final int columnIndex; + /** {@link WritableWrapper} for Hive column to Giraph Writable */ + private final WritableWrapper<I, Object> writableWrapper; + + /** + * Constructor + * + * @param columnIndex column index + * @param writableWrapper {@link WritableWrapper} + */ + public TypedVertexIdReader(int columnIndex, + WritableWrapper<I, Object> writableWrapper) { + this.columnIndex = columnIndex; + this.writableWrapper = writableWrapper; + } + + /** + * Create from Configuration with column name and Schema + * + * @param conf {@link org.apache.hadoop.conf.Configuration} + * @param columnOption {@link StrConfOption} for column name + * @param schema {@link HiveTableSchema} + * @param <I> Vertex ID + * @return {@link TypedVertexIdReader} + */ + public static <I extends WritableComparable> HiveVertexIdReader<I> + createIdReader( + ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, + HiveTableSchema schema) { + Class<I> vertexIdClass = conf.getVertexIdClass(); + if (NullWritable.class.isAssignableFrom(vertexIdClass)) { + return HiveVertexIdReader.Null.get(); + } + int columnIndex = columnIndexOrThrow(schema, conf, columnOption); + Class hiveClass = schema.columnType(columnIndex).javaClass(); + WritableWrapper wrapper = lookup(vertexIdClass, hiveClass); + return new TypedVertexIdReader<I>(columnIndex, wrapper); + } + + @Override + public I readId(HiveReadableRecord record) { + Object object = record.get(columnIndex); + return writableWrapper.wrap(object); + } + + public int getColumnIndex() { + return columnIndex; + } + + public WritableWrapper<I, Object> getWritableWrapper() { + return writableWrapper; + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java new file mode 100644 index 0000000..aece54b --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.types; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.types.WritableUnwrapper; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; + +import com.facebook.hiveio.record.HiveWritableRecord; +import com.facebook.hiveio.schema.HiveTableSchema; + +import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; +import static org.apache.giraph.types.WritableUnwrappers.lookup; + +/** + * Writer for Vertex IDs from Hive with known types + * + * @param <I> Vertex ID + */ +public class TypedVertexIdWriter<I extends WritableComparable> + implements HiveVertexIdWriter<I> { + /** Hive column index */ + private final int columnIndex; + /** {@link WritableUnwrapper} for Hive column to Giraph Writable */ + private final WritableUnwrapper<I, Object> writableUnwrapper; + + /** + * Constructor + * + * @param columnIndex column index + * @param writableUnwrapper {@link WritableUnwrapper} + */ + public TypedVertexIdWriter(int columnIndex, + WritableUnwrapper<I, Object> writableUnwrapper) { + this.columnIndex = columnIndex; + this.writableUnwrapper = writableUnwrapper; + } + + /** + * Create from Configuration with column name and Schema + * + * @param conf {@link org.apache.hadoop.conf.Configuration} + * @param columnOption {@link StrConfOption} for column name + * @param schema {@link HiveTableSchema} + * @param <I> Vertex ID + * @return {@link TypedVertexIdWriter} + */ + public static <I extends WritableComparable> HiveVertexIdWriter<I> + createIdWriter( + ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, + HiveTableSchema schema) { + Class<I> vertexIdClass = conf.getVertexIdClass(); + if (NullWritable.class.isAssignableFrom(vertexIdClass)) { + return HiveVertexIdWriter.Null.get(); + } + int columnIndex = columnIndexOrThrow(schema, conf, columnOption); + Class hiveClass = schema.columnType(columnIndex).javaClass(); + WritableUnwrapper unwrapper = lookup(vertexIdClass, hiveClass); + return new TypedVertexIdWriter<I>(columnIndex, unwrapper); + } + + @Override + public void writeId(I value, HiveWritableRecord record) { + Object object = writableUnwrapper.unwrap(value); + record.set(columnIndex, object); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java new file mode 100644 index 0000000..7004637 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * HiveIO type-based things. + */ +package org.apache.giraph.hive.types;
