Updated Branches: refs/heads/trunk 8df1027b4 -> d419f8f4f
http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java deleted file mode 100644 index 33196f5..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.types; - -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveWritableRecord; - -/** - * Interface for writing Vertex IDs from Hive records - * - * @param <I> Vertex ID - */ -public interface HiveVertexIdWriter<I extends WritableComparable> { - /** - * Write Vertex ID to record - * - * @param id Vertex ID - * @param record Hive record - */ - void writeId(I id, HiveWritableRecord record); - - /** - * Null implementation that does nothing - * - * @param <W> Writable type - */ - public static class Null<W extends WritableComparable> - implements HiveVertexIdWriter<W> { - /** Singleton */ - private static final Null INSTANCE = new Null(); - - /** - * Get singleton - * - * @return singleton instance - */ - public static Null get() { - return INSTANCE; - } - - @Override - public void writeId(W id, HiveWritableRecord record) { } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java deleted file mode 100644 index e9b0687..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.types; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.types.WritableWrapper; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; -import static org.apache.giraph.types.WritableWrappers.lookup; - -/** - * Reader for Vertex/Edge Values from Hive with known types - * - * @param <W> Vertex/Edge Value - */ -public class TypedValueReader<W extends Writable> - implements HiveValueReader<W> { - /** Hive column index */ - private final int columnIndex; - /** {@link WritableWrapper} for Hive column to Giraph Writable */ - private final WritableWrapper<W, Object> writableWrapper; - - /** - * Constructor - * - * @param columnIndex column index - * @param writableWrapper {@link WritableWrapper} - */ - public TypedValueReader(int columnIndex, - WritableWrapper<W, Object> writableWrapper) { - this.columnIndex = columnIndex; - this.writableWrapper = writableWrapper; - } - - /** - * Create from Configuration with column name and Schema - * - * @param conf Configuration - * @param columnOption StrConfOption for column name - * @param schema HiveTableSchema - * @param <V> Vertex/Edge Value - * @return TypedVertexValueReader - */ - public static <V extends Writable> HiveValueReader<V> - createValueReader( - ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, - HiveTableSchema schema) { - Class<V> vertexValueClass = conf.getVertexValueClass(); - if (NullWritable.class.isAssignableFrom(vertexValueClass)) { - return HiveValueReader.Null.get(); - } - int columnIndex = columnIndexOrThrow(schema, conf, columnOption); - Class hiveClass = schema.columnType(columnIndex).javaClass(); - WritableWrapper wrapper = lookup(vertexValueClass, hiveClass); - return new TypedValueReader(columnIndex, wrapper); - } - - @Override - public W readValue(HiveReadableRecord record) { - Object object = record.get(columnIndex); - return writableWrapper.wrap(object); - } - - public int getColumnIndex() { - return columnIndex; - } - - public WritableWrapper<W, Object> getWritableWrapper() { - return writableWrapper; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java deleted file mode 100644 index 8841bda..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.types; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.types.WritableUnwrapper; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -import com.facebook.hiveio.record.HiveWritableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; -import static org.apache.giraph.types.WritableUnwrappers.lookup; - -/** - * Writer for Vertex/Edge Values from Hive with known types - * - * @param <W> Vertex/Edge Value - */ -public class TypedValueWriter<W extends Writable> - implements HiveValueWriter<W> { - /** Hive column index */ - private final int columnIndex; - /** {@link WritableUnwrapper} for Hive column to Giraph Writable */ - private final WritableUnwrapper<W, Object> writableUnwrapper; - - /** - * Constructor - * - * @param columnIndex column index - * @param writableUnwrapper JavaWritableConverter - */ - public TypedValueWriter(int columnIndex, - WritableUnwrapper<W, Object> writableUnwrapper) { - this.columnIndex = columnIndex; - this.writableUnwrapper = writableUnwrapper; - } - - /** - * Create from Configuration with column name and Schema - * - * @param conf Configuration - * @param columnOption StrConfOption for column name - * @param schema HiveTableSchema - * @param <V> Vertex/Edge Value - * @return TypedVertexValueReader - */ - public static <V extends Writable> HiveValueWriter<V> - createValueWriter( - ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, - HiveTableSchema schema) { - Class<V> vertexValueClass = conf.getVertexValueClass(); - if (NullWritable.class.isAssignableFrom(vertexValueClass)) { - return HiveValueWriter.Null.get(); - } - int columnIndex = columnIndexOrThrow(schema, conf, columnOption); - Class hiveClass = schema.columnType(columnIndex).javaClass(); - WritableUnwrapper unwrapper = lookup(vertexValueClass, hiveClass); - return new TypedValueWriter(columnIndex, unwrapper); - } - - @Override - public void writeValue(W value, HiveWritableRecord record) { - Object object = writableUnwrapper.unwrap(value); - record.set(columnIndex, object); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java deleted file mode 100644 index 6d80d37..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.types; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.types.WritableWrapper; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; -import static org.apache.giraph.types.WritableWrappers.lookup; - -/** - * Reader for Vertex IDs from Hive with known types - * - * @param <I> Vertex ID - */ -public class TypedVertexIdReader<I extends WritableComparable> - implements HiveVertexIdReader<I> { - /** Hive column index */ - private final int columnIndex; - /** {@link WritableWrapper} for Hive column to Giraph Writable */ - private final WritableWrapper<I, Object> writableWrapper; - - /** - * Constructor - * - * @param columnIndex column index - * @param writableWrapper {@link WritableWrapper} - */ - public TypedVertexIdReader(int columnIndex, - WritableWrapper<I, Object> writableWrapper) { - this.columnIndex = columnIndex; - this.writableWrapper = writableWrapper; - } - - /** - * Create from Configuration with column name and Schema - * - * @param conf {@link org.apache.hadoop.conf.Configuration} - * @param columnOption {@link StrConfOption} for column name - * @param schema {@link HiveTableSchema} - * @param <I> Vertex ID - * @return {@link TypedVertexIdReader} - */ - public static <I extends WritableComparable> HiveVertexIdReader<I> - createIdReader( - ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, - HiveTableSchema schema) { - Class<I> vertexIdClass = conf.getVertexIdClass(); - if (NullWritable.class.isAssignableFrom(vertexIdClass)) { - return HiveVertexIdReader.Null.get(); - } - int columnIndex = columnIndexOrThrow(schema, conf, columnOption); - Class hiveClass = schema.columnType(columnIndex).javaClass(); - WritableWrapper wrapper = lookup(vertexIdClass, hiveClass); - return new TypedVertexIdReader<I>(columnIndex, wrapper); - } - - @Override - public I readId(HiveReadableRecord record) { - Object object = record.get(columnIndex); - return writableWrapper.wrap(object); - } - - public int getColumnIndex() { - return columnIndex; - } - - public WritableWrapper<I, Object> getWritableWrapper() { - return writableWrapper; - } -} - http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java deleted file mode 100644 index aece54b..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.types; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.types.WritableUnwrapper; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveWritableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow; -import static org.apache.giraph.types.WritableUnwrappers.lookup; - -/** - * Writer for Vertex IDs from Hive with known types - * - * @param <I> Vertex ID - */ -public class TypedVertexIdWriter<I extends WritableComparable> - implements HiveVertexIdWriter<I> { - /** Hive column index */ - private final int columnIndex; - /** {@link WritableUnwrapper} for Hive column to Giraph Writable */ - private final WritableUnwrapper<I, Object> writableUnwrapper; - - /** - * Constructor - * - * @param columnIndex column index - * @param writableUnwrapper {@link WritableUnwrapper} - */ - public TypedVertexIdWriter(int columnIndex, - WritableUnwrapper<I, Object> writableUnwrapper) { - this.columnIndex = columnIndex; - this.writableUnwrapper = writableUnwrapper; - } - - /** - * Create from Configuration with column name and Schema - * - * @param conf {@link org.apache.hadoop.conf.Configuration} - * @param columnOption {@link StrConfOption} for column name - * @param schema {@link HiveTableSchema} - * @param <I> Vertex ID - * @return {@link TypedVertexIdWriter} - */ - public static <I extends WritableComparable> HiveVertexIdWriter<I> - createIdWriter( - ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption, - HiveTableSchema schema) { - Class<I> vertexIdClass = conf.getVertexIdClass(); - if (NullWritable.class.isAssignableFrom(vertexIdClass)) { - return HiveVertexIdWriter.Null.get(); - } - int columnIndex = columnIndexOrThrow(schema, conf, columnOption); - Class hiveClass = schema.columnType(columnIndex).javaClass(); - WritableUnwrapper unwrapper = lookup(vertexIdClass, hiveClass); - return new TypedVertexIdWriter<I>(columnIndex, unwrapper); - } - - @Override - public void writeId(I value, HiveWritableRecord record) { - Object object = writableUnwrapper.unwrap(value); - record.set(columnIndex, object); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java deleted file mode 100644 index 7004637..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * HiveIO type-based things. - */ -package org.apache.giraph.hive.types; http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java new file mode 100644 index 0000000..e989e28 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.values; + +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveReadableRecord; + +/** + * Interface for reading Vertex / Edge values from Hive records + * + * @param <T> Value type + */ +public interface HiveValueReader<T extends Writable> { + /** + * Read value from record + * + * @param value graph value to read into + * @param record Hive record + */ + void readFields(T value, HiveReadableRecord record); + + /** + * Null implementation that return NullWritable + * + * @param <W> Writable type + */ + public class Null<W extends Writable> implements HiveValueReader<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public void readFields(W value, HiveReadableRecord record) { } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java new file mode 100644 index 0000000..4b624f6 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/HiveValueWriter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.hive.values; + +import org.apache.hadoop.io.Writable; + +import com.facebook.hiveio.record.HiveWritableRecord; + +/** + * Interface for writing Vertex / Edge values from Hive records + * + * @param <T> Value type + */ +public interface HiveValueWriter<T extends Writable> { + /** + * Write value to record + * + * @param value the value + * @param record Hive record + */ + void write(T value, HiveWritableRecord record); + + /** + * Null implementation that does nothing + * + * @param <W> Writable type + */ + public static class Null<W extends Writable> implements HiveValueWriter<W> { + /** Singleton */ + private static final Null INSTANCE = new Null(); + + /** + * Get singleton + * + * @return singleton instance + */ + public static Null get() { + return INSTANCE; + } + + @Override + public void write(W value, HiveWritableRecord record) { } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java new file mode 100644 index 0000000..1e8f497 --- /dev/null +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/values/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive type (vertex ID, vertex value, edge value, etc) related things. + */ +package org.apache.giraph.hive.values; http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java index 49a36d0..ff932b3 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/GiraphHiveTestBase.java @@ -2,9 +2,11 @@ package org.apache.giraph.hive; import org.junit.BeforeClass; +import com.facebook.hiveio.log.LogHelpers; + public class GiraphHiveTestBase { @BeforeClass public static void silenceLoggers() { - com.facebook.hiveio.testing.Helpers.silenceLoggers(); + LogHelpers.silenceLoggers(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java index 00c00ca..d2103a7 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/Helpers.java @@ -32,11 +32,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import com.google.common.collect.Maps; import java.io.IOException; +import java.io.InputStream; import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; public class Helpers { + public static InputStream getResource(String name) { + return Helpers.class.getClassLoader().getResourceAsStream(name); + } + public static Map<Integer, Double> parseIntDoubleResults(Iterable<String> results) { Map<Integer, Double> values = Maps.newHashMap(); for (String line : results) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java index d5bbb95..bf5af0f 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveEdgeInputTest.java @@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.thrift.TException; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.facebook.hiveio.common.HiveMetastores; http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java index af850d5..c75652c 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java @@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.thrift.TException; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.facebook.hiveio.common.HiveMetastores; http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java new file mode 100644 index 0000000..b664d7d --- /dev/null +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonComplexTypes.java @@ -0,0 +1,139 @@ +package org.apache.giraph.hive.jython; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.Language; +import org.apache.giraph.hive.GiraphHiveTestBase; +import org.apache.giraph.hive.Helpers; +import org.apache.giraph.jython.JythonJob; +import org.apache.giraph.scripting.DeployType; +import org.apache.giraph.scripting.ScriptLoader; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.python.util.PythonInterpreter; + +import com.facebook.hiveio.common.HiveMetastores; +import com.facebook.hiveio.input.HiveInput; +import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.testing.LocalHiveServer; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import junit.framework.Assert; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Set; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.apache.giraph.hive.Helpers.getResource; + +public class TestHiveJythonComplexTypes extends GiraphHiveTestBase { + private LocalHiveServer hiveServer = new LocalHiveServer("jython-test"); + + @Before + public void setUp() throws IOException, TException { + hiveServer.init(); + HiveMetastores.setTestClient(hiveServer.getClient()); + } + + @Test + public void testFakeLabelPropagation() throws Exception { + String edgesTable = "flp_edges"; + hiveServer.createTable("CREATE TABLE " + edgesTable + + " (source_id INT, " + + " target_id INT," + + " value FLOAT) " + + " ROW FORMAT DELIMITED " + + " FIELDS TERMINATED BY '\t'"); + String[] edges = new String[] { + "1\t2\t0.2", + "2\t3\t0.3", + "3\t4\t0.4", + "4\t1\t0.1", + }; + hiveServer.loadData(edgesTable, edges); + + String vertexesTable = "flp_vertexes"; + hiveServer.createTable("CREATE TABLE " + vertexesTable + + " (id INT, " + + " value MAP<INT,FLOAT>) " + + " ROW FORMAT DELIMITED " + + " FIELDS TERMINATED BY '\t' " + + " COLLECTION ITEMS TERMINATED BY ',' " + + " MAP KEYS TERMINATED BY ':' "); + String[] vertexes = new String[] { + "1\t11:0.8,12:0.1", + "2\t13:0.3,14:0.2", + "3\t15:0.4,16:0.7", + "4\t17:0.1,18:0.6", + }; + hiveServer.loadData(vertexesTable, vertexes); + + String outputTable = "flp_output"; + hiveServer.createTable("CREATE TABLE " + outputTable + + " (id INT," + + " value MAP<INT,DOUBLE>) " + + " ROW FORMAT DELIMITED " + + " FIELDS TERMINATED BY '\t'"); + + String workerJythonPath = + "org/apache/giraph/jython/fake-label-propagation-worker.py"; + + InputStream launcher = getResource( + "org/apache/giraph/jython/fake-label-propagation-launcher.py"); + assertNotNull(launcher); + InputStream worker = getResource(workerJythonPath); + assertNotNull(worker); + + PythonInterpreter interpreter = new PythonInterpreter(); + + JythonJob jythonJob = + HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker); + + GiraphConfiguration conf = new GiraphConfiguration(); + + ScriptLoader.setScriptsToLoad(conf, workerJythonPath, DeployType.RESOURCE, + Language.JYTHON); + + HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter); + + InternalVertexRunner.run(conf, new String[0], new String[0]); + + Helpers.commitJob(conf); + + HiveInputDescription inputDesc = new HiveInputDescription(); + inputDesc.getTableDesc().setTableName(outputTable); + + Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator(); + + printRecords(HiveInput.readTable(inputDesc)); + + final int rows = 4; + + Set<Integer>[] expected = new Set[rows+1]; + expected[1] = ImmutableSet.of(11,12,15,16,17,18); + expected[2] = ImmutableSet.of(13,14,17,18,11,12); + expected[3] = ImmutableSet.of(15,16,11,12,13,14); + expected[4] = ImmutableSet.of(17,18,13,14,15,16); + + for (int i = 0; i < rows; ++i) { + assertTrue(records.hasNext()); + HiveReadableRecord record = records.next(); + assertEquals(expected[record.getInt(0)], record.getMap(1).keySet()); + } + + assertFalse(records.hasNext()); + } + + private void printRecords(Iterable<HiveReadableRecord> records) { + for (HiveReadableRecord record : records) { + System.out.println("record: " + record); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java new file mode 100644 index 0000000..c1db151 --- /dev/null +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestHiveJythonPrimitives.java @@ -0,0 +1,110 @@ +package org.apache.giraph.hive.jython; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.Language; +import org.apache.giraph.hive.GiraphHiveTestBase; +import org.apache.giraph.hive.Helpers; +import org.apache.giraph.jython.JythonJob; +import org.apache.giraph.scripting.DeployType; +import org.apache.giraph.scripting.ScriptLoader; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.python.util.PythonInterpreter; + +import com.facebook.hiveio.common.HiveMetastores; +import com.facebook.hiveio.input.HiveInput; +import com.facebook.hiveio.input.HiveInputDescription; +import com.facebook.hiveio.record.HiveReadableRecord; +import com.facebook.hiveio.testing.LocalHiveServer; +import junit.framework.Assert; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static org.apache.giraph.hive.Helpers.getResource; +import static org.junit.Assert.assertEquals; + +public class TestHiveJythonPrimitives extends GiraphHiveTestBase { + private LocalHiveServer hiveServer = new LocalHiveServer("jython-test"); + + @Before + public void setUp() throws IOException, TException { + hiveServer.init(); + HiveMetastores.setTestClient(hiveServer.getClient()); + } + + @Test + public void testCountEdges() throws Exception { + String edgesTable = "count_edges_edge_input"; + hiveServer.createTable("CREATE TABLE " + edgesTable + + " (source_edge_id INT, " + + " target_edge_id INT) " + + " ROW FORMAT DELIMITED " + + " FIELDS TERMINATED BY '\t'"); + String[] edges = new String[] { + "1\t2", + "2\t3", + "2\t4", + "4\t1" + }; + hiveServer.loadData(edgesTable, edges); + + String outputTable = "count_edges_output"; + hiveServer.createTable("CREATE TABLE " + outputTable + + " (vertex_id INT," + + " num_edges INT) " + + " ROW FORMAT DELIMITED " + + " FIELDS TERMINATED BY '\t'"); + + String workerJythonPath = "org/apache/giraph/jython/count-edges.py"; + + InputStream launcher = getResource( + "org/apache/giraph/jython/count-edges-launcher.py"); + assertNotNull(launcher); + InputStream worker = getResource(workerJythonPath); + assertNotNull(worker); + + PythonInterpreter interpreter = new PythonInterpreter(); + + JythonJob jythonJob = + HiveJythonUtils.parseJythonStreams(interpreter, launcher, worker); + + GiraphConfiguration conf = new GiraphConfiguration(); + + ScriptLoader.setScriptsToLoad(conf, workerJythonPath, + DeployType.RESOURCE, Language.JYTHON); + + HiveJythonUtils.writeJythonJobToConf(jythonJob, conf, interpreter); + + InternalVertexRunner.run(conf, new String[0], new String[0]); + + Helpers.commitJob(conf); + + HiveInputDescription inputDesc = new HiveInputDescription(); + inputDesc.getTableDesc().setTableName(outputTable); + + Iterator<HiveReadableRecord> records = HiveInput.readTable(inputDesc).iterator(); + + int expected[] = { -1, 1, 2, -1, 1 }; + + assertTrue(records.hasNext()); + HiveReadableRecord record = records.next(); + Assert.assertEquals(expected[record.getInt(0)], record.getInt(1)); + + assertTrue(records.hasNext()); + record = records.next(); + Assert.assertEquals(expected[record.getInt(0)], record.getInt(1)); + + assertTrue(records.hasNext()); + record = records.next(); + Assert.assertEquals(expected[record.getInt(0)], record.getInt(1)); + + assertFalse(records.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java b/giraph-hive/src/test/java/org/apache/giraph/hive/jython/TestJythonLabelInfluence.java new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java index 4d4d976..39390f1 100644 --- a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java +++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java @@ -29,7 +29,6 @@ import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.thrift.TException; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.facebook.hiveio.common.HiveMetastores; http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py new file mode 100644 index 0000000..a87f030 --- /dev/null +++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.giraph.combiner import DoubleSumCombiner +from org.apache.giraph.edge import ByteArrayEdges +from org.apache.giraph.jython import JythonJob +from org.apache.hadoop.io import IntWritable +from org.apache.hadoop.io import NullWritable + + +def prepare(job): + job.hive_database = "default" + job.workers = 3 + + job.computation_name = "CountEdges" + + job.vertex_id.type = IntWritable + job.vertex_value.type = IntWritable + job.edge_value.type = NullWritable + job.message_value.type = NullWritable + + edge_input = JythonJob.EdgeInput() + edge_input.table = "count_edges_edge_input" + edge_input.source_id_column = "source_edge_id" + edge_input.target_id_column = "target_edge_id" + job.edge_inputs.add(edge_input) + + job.vertex_output.table = "count_edges_output" + job.vertex_output.id_column = "vertex_id" + job.vertex_output.value_column = "num_edges" http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py new file mode 100644 index 0000000..2d1c381 --- /dev/null +++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-launcher.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.hadoop.io import IntWritable +from org.apache.giraph.jython import JythonJob + + +def prepare(job): + job.hive_database = "default" + job.workers = 5 + + job.computation_name = "FakeLabelPropagation" + + job.vertex_id.type = IntWritable + + job.vertex_value.type = FakeLPVertexValue + job.vertex_value.hive_io = FakeLPVertexValueHive + + job.edge_value.type = FakeLPEdgeValue + job.edge_value.hive_reader = FakeLPEdgeReader + + job.message_value.type = "FakeLPMessageValue" + + edge_input = JythonJob.EdgeInput() + edge_input.table = "flp_edges" + edge_input.source_id_column = "source_id" + edge_input.target_id_column = "target_id" + edge_input.value_column = "value" + job.edge_inputs.add(edge_input) + + vertex_input = JythonJob.VertexInput() + vertex_input.table = "flp_vertexes" + vertex_input.id_column = "id" + vertex_input.value_column = "value" + job.vertex_inputs.add(vertex_input) + + job.vertex_output.table = "flp_output" + job.vertex_output.id_column = "id" + job.vertex_output.value_column = "value" http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py ---------------------------------------------------------------------- diff --git a/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py new file mode 100644 index 0000000..9499a35 --- /dev/null +++ b/giraph-hive/src/test/resources/org/apache/giraph/jython/fake-label-propagation-worker.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.hadoop.io import Writable +from org.apache.giraph.jython import JythonComputation +from org.apache.giraph.hive.jython import JythonHiveIO +from org.apache.giraph.hive.jython import JythonHiveReader + + +# Implements HiveColumnIO to tell Giraph how to read/write from Hive +class FakeLPVertexValue: + def __init__(self): + self.labels = {} + self.dog = 'cat' + + def add(self, message): + for label, weight in message.labels.iteritems(): + if label in self.labels: + self.labels[label] += weight + else: + self.labels[label] = weight + + +# Hive reader/writer for vertexes +class FakeLPVertexValueHive(JythonHiveIO): + def readFromHive(self, vertex_value, column): + vertex_value.labels = column.getMap() + + def writeToHive(self, vertex_value, column): + column.setMap(vertex_value.labels) + + +# Implements Writable to override default Jython serialization which grabs all +# of the data in an object. +# Also implements HiveColumnReadable to read from Hive. +class FakeLPEdgeValue(Writable): + def __init__(self): + self.value = 2.13 + self.foo = "bar" + + def readFields(self, data_input): + self.value = data_input.readFloat() + self.foo = "read_in" + + def write(self, data_output): + data_output.writeFloat(self.value) + self.foo = "wrote_out" + + +# Hive reader for edges +class FakeLPEdgeReader(JythonHiveReader): + def readFromHive(self, edge_value, column): + edge_value.value = column.getFloat() + + +# Doesn't implement anything. Use default Jython serialization. +# No need for I/O with Hive +class FakeLPMessageValue: + def __init__(self): + self.labels = {} + + +# Implements BasicComputation to be a Computation Giraph can use +class FakeLabelPropagation(JythonComputation): + def compute(self, vertex, messages): + if self.superstep == 0: + self.send_msg(vertex) + elif self.superstep < self.conf.getInt("supersteps", 3): + for message in messages: + vertex.value.add(message) + self.send_msg(vertex) + else: + vertex.voteToHalt() + + def send_msg(self, vertex): + msg = FakeLPMessageValue() + msg.labels = vertex.value.labels + self.sendMessageToAllEdges(vertex, msg) http://git-wip-us.apache.org/repos/asf/giraph/blob/d419f8f4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8aa1f45..41b6bb1 100644 --- a/pom.xml +++ b/pom.xml @@ -254,6 +254,7 @@ under the License. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <dep.accumulo.version>1.4.0</dep.accumulo.version> + <dep.airline.version>0.5</dep.airline.version> <dep.base64.version>2.3.8</dep.base64.version> <dep.cli-parser.version>1.1</dep.cli-parser.version> <dep.codehaus-jackson.version>1.8.0</dep.codehaus-jackson.version> @@ -271,7 +272,7 @@ under the License. <dep.guava.version>12.0</dep.guava.version> <dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version> <dep.hive.version>0.11.0</dep.hive.version> - <dep.hiveio.version>0.15</dep.hiveio.version> + <dep.hiveio.version>0.16</dep.hiveio.version> <dep.json.version>20090211</dep.json.version> <dep.junit.version>4.8</dep.junit.version> <dep.jython.version>2.5.3</dep.jython.version> @@ -1049,6 +1050,11 @@ under the License. <version>${dep.yourkit-api.version}</version> </dependency> <dependency> + <groupId>io.airlift</groupId> + <artifactId>airline</artifactId> + <version>${dep.airline.version}</version> + </dependency> + <dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>${dep.netty.version}</version>
