http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java deleted file mode 100644 index 321c945..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.edge.OutEdges; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.util.Iterator; - -/** - * Simple implementation of {@link HiveToVertex} when each vertex is in the one - * row of the input. - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public abstract class SimpleHiveToVertex<I extends WritableComparable, - V extends Writable, E extends Writable> - extends AbstractHiveToVertex<I, V, E> { - /** Hive records which we are reading from */ - private Iterator<HiveReadableRecord> records; - - /** Reusable vertex object */ - private Vertex<I, V, E> reusableVertex; - - /** Reusable vertex id */ - private I reusableVertexId; - /** Reusable vertex value */ - private V reusableVertexValue; - /** Reusable edges */ - private OutEdges<I, E> reusableOutEdges; - - /** - * Read the Vertex's ID from the HiveRecord given. - * - * @param record HiveRecord to read from. - * @return Vertex ID - */ - public abstract I getVertexId(HiveReadableRecord record); - - /** - * Read the Vertex's Value from the HiveRecord given. - * - * @param record HiveRecord to read from. - * @return Vertex Value - */ - public abstract V getVertexValue(HiveReadableRecord record); - - /** - * Read Vertex's edges from the HiveRecord given. - * - * @param record HiveRecord to read from. - * @return iterable of edges - */ - public abstract Iterable<Edge<I, E>> getEdges(HiveReadableRecord record); - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - this.records = records; - reusableVertex = getConf().createVertex(); - reusableVertexId = getConf().createVertexId(); - reusableVertexValue = getConf().createVertexValue(); - reusableOutEdges = getConf().createOutEdges(); - } - - @Override - public boolean hasNext() { - return records.hasNext(); - } - - @Override - public Vertex<I, V, E> next() { - HiveReadableRecord record = records.next(); - I id = getVertexId(record); - V value = getVertexValue(record); - Iterable<Edge<I, E>> edges = getEdges(record); - reusableVertex.initialize(id, value, edges); - return reusableVertex; - } - - protected I getReusableVertexId() { - return reusableVertexId; - } - - protected V getReusableVertexValue() { - return reusableVertexValue; - } - - /** - * Get reusable OutEdges object - * - * @param <OE> Type of OutEdges - * @return Reusable OutEdges object - */ - protected <OE extends OutEdges<I, E>> OE getReusableOutEdges() { - return (OE) reusableOutEdges; - } -}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java deleted file mode 100644 index 21cc6c4..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.edge.Edge; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; -import com.google.common.collect.ImmutableList; - -/** - * Simple implementation of {@link HiveToVertex} when each vertex is in the one - * row of the input, and there are no edges in vertex input. - * - * @param <I> Vertex ID - * @param <V> Vertex Value - */ -public abstract class SimpleNoEdgesHiveToVertex<I extends WritableComparable, - V extends Writable> extends SimpleHiveToVertex<I, V, Writable> { - @Override - public final Iterable<Edge<I, Writable>> getEdges(HiveReadableRecord record) { - return ImmutableList.of(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java deleted file mode 100644 index cae3e01..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.vertex.examples; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.hive.common.HiveParsing; -import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * Simple HiveToVertex that reads vertices with integer IDs, Double vertex - * values, and edges with Double values. - */ -public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable, - DoubleWritable, DoubleWritable> { - @Override public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.INT, schema); - Records.verifyType(1, HiveType.DOUBLE, schema); - Records.verifyType(2, HiveType.MAP, schema); - } - - @Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges( - HiveReadableRecord record) { - return HiveParsing.parseIntDoubleEdges(record, 2); - } - - @Override - public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0, getReusableVertexId()); - } - - @Override - public DoubleWritable getVertexValue(HiveReadableRecord record) { - return HiveParsing.parseDoubleWritable(record, 1, getReusableVertexValue()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java deleted file mode 100644 index 526a9a7..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.vertex.examples; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; -import org.apache.giraph.edge.Edge; -import org.apache.giraph.hive.common.HiveParsing; -import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; - -/** - * Simple HiveToVertex that reads vertices with integer IDs, no vertex values, - * and edges with no values. - */ -public class HiveIntIntNullVertex - extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> { - @Override public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.INT, schema); - Records.verifyType(1, HiveType.LIST, schema); - } - - @Override - public Iterable<Edge<IntWritable, NullWritable>> getEdges( - HiveReadableRecord record) { - return HiveParsing.parseIntNullEdges(record, 1); - } - - @Override - public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0, getReusableVertexId()); - } - - @Override - public IntWritable getVertexValue(HiveReadableRecord record) { - return getReusableVertexValue(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java deleted file mode 100644 index 9148c57..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.vertex.examples; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.hive.common.HiveParsing; -import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * Simple HiveToVertex that reads vertices with integer IDs, no vertex values, - * and edges with no values. - */ -public class HiveIntNullNullVertex - extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> { - @Override public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.INT, schema); - Records.verifyType(1, HiveType.LIST, schema); - } - - @Override - public Iterable<Edge<IntWritable, NullWritable>> getEdges( - HiveReadableRecord record) { - return HiveParsing.parseIntNullEdges(record, 1); - } - - @Override - public IntWritable getVertexId(HiveReadableRecord record) { - return HiveParsing.parseIntID(record, 0, getReusableVertexId()); - } - - @Override - public NullWritable getVertexValue(HiveReadableRecord record) { - return NullWritable.get(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java deleted file mode 100644 index ea15393..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Hive input vertex examples. - */ -package org.apache.giraph.hive.input.vertex.examples; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java deleted file mode 100644 index 9027962..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Hive vertex input related things. - */ -package org.apache.giraph.hive.input.vertex; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java deleted file mode 100644 index 3d7afec..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.graph.Language; -import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.jython.JythonJob; -import org.apache.giraph.scripting.DeployType; -import org.apache.giraph.scripting.ScriptLoader; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; -import org.python.util.PythonInterpreter; - -import com.facebook.hiveio.HiveIO; - -import java.util.Arrays; - -import static org.apache.giraph.hive.jython.HiveJythonUtils.parseJythonFiles; -import static org.apache.giraph.utils.DistributedCacheUtils.copyAndAdd; - -/** - * Runner for jobs written in Jython - */ -public class HiveJythonRunner implements Tool { - /** Logger */ - private static final Logger LOG = Logger.getLogger(HiveJythonRunner.class); - /** Configuration */ - private static HiveConf CONF = new HiveConf(); - - @Override public int run(String[] args) throws Exception { - args = HiveJythonUtils.processArgs(args, CONF); - LOG.info("Processed hive options now have args: " + Arrays.toString(args)); - - HiveIO.init(CONF, false); - - PythonInterpreter interpreter = new PythonInterpreter(); - - JythonJob jythonJob = parseJythonFiles(interpreter, args); - - logOptions(); - - for (String arg : args) { - Path remoteScriptPath = copyAndAdd(new Path(arg), CONF); - ScriptLoader.addScriptToLoad(CONF, remoteScriptPath.toString(), - DeployType.DISTRIBUTED_CACHE, Language.JYTHON); - } - - String name = HiveJythonUtils.writeJythonJobToConf(jythonJob, CONF, - interpreter); - - GiraphJob job = new GiraphJob(CONF, name); - return job.run(true) ? 0 : -1; - } - - /** - * Log options used - */ - private static void logOptions() { - StringBuilder sb = new StringBuilder(100); - appendEnvVars(sb, "JAVA_HOME", "MAPRED_POOL_NAME"); - appendEnvVars(sb, "HADOOP_HOME", "HIVE_HOME"); - LOG.info("Environment:\n" + sb); - } - - /** - * Append environment variables to StringBuilder - * - * @param sb StringBuilder - * @param names vararg of env keys - */ - private static void appendEnvVars(StringBuilder sb, String ... names) { - for (String name : names) { - sb.append(name).append("=").append(System.getenv(name)).append("\n"); - } - } - - /** - * Set the static configuration stored - * - * @param conf Configuration - */ - public static void setStaticConf(Configuration conf) { - if (conf instanceof HiveConf) { - HiveJythonRunner.CONF = (HiveConf) conf; - } else { - HiveJythonRunner.CONF = new HiveConf(conf, HiveJythonRunner.class); - } - } - - @Override public void setConf(Configuration conf) { - setStaticConf(conf); - } - - @Override public Configuration getConf() { - return CONF; - } - - /** - * Entry point - * - * @param args command line args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - System.exit(ToolRunner.run(new HiveJythonRunner(), args)); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java deleted file mode 100644 index ba3e8cc..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java +++ /dev/null @@ -1,910 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS; -import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES; -import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS; -import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS; -import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS; -import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS; -import static org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE; -import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS; -import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME; -import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME; -import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_SOURCE_ID_COLUMN; -import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_TARGET_ID_COLUMN; -import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_VALUE_COLUMN; -import static org.apache.giraph.hive.jython.JythonVertexToHive.VERTEX_VALUE_COLUMN; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.GiraphTypes; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.graph.GraphType; -import org.apache.giraph.graph.Language; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.common.LanguageAndType; -import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat; -import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat; -import org.apache.giraph.hive.output.HiveVertexOutputFormat; -import org.apache.giraph.hive.primitives.PrimitiveValueReader; -import org.apache.giraph.hive.primitives.PrimitiveValueWriter; -import org.apache.giraph.hive.values.HiveValueReader; -import org.apache.giraph.hive.values.HiveValueWriter; -import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat; -import org.apache.giraph.io.formats.multi.MultiVertexInputFormat; -import org.apache.giraph.jython.JythonJob; -import org.apache.giraph.jython.JythonUtils; -import org.apache.giraph.jython.factories.JythonEdgeValueFactory; -import org.apache.giraph.jython.factories.JythonFactoryBase; -import org.apache.giraph.jython.factories.JythonOutgoingMessageValueFactory; -import org.apache.giraph.jython.factories.JythonVertexIdFactory; -import org.apache.giraph.jython.factories.JythonVertexValueFactory; -import org.apache.giraph.jython.wrappers.JythonWritableWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.log4j.Logger; -import org.python.core.Py; -import org.python.core.PyClass; -import org.python.core.PyObject; -import org.python.core.PyType; -import org.python.util.PythonInterpreter; - -import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.io.Closeables; - -/** - * Plugin to {@link HiveJythonRunner} to use Hive. - */ -public class HiveJythonUtils { - /** Logger */ - private static final Logger LOG = Logger.getLogger(HiveJythonUtils.class); - - /** Don't construct */ - private HiveJythonUtils() { } - - /** - * Process command line arguments - * - * @param args cmdline args - * @param conf {@link Configuration} - * @return remaining cmdline args to process - */ - public static String[] processArgs(String[] args, Configuration conf) { - HiveUtils.addHadoopClasspathToTmpJars(conf); - HiveUtils.addHiveSiteXmlToTmpFiles(conf); - HiveUtils.addHiveSiteCustomXmlToTmpFiles(conf); - return moveHiveconfOptionsToConf(args, conf); - } - - /** - * Remove -hiveconf options from cmdline - * - * @param args cmdline args - * @param conf Configuration - * @return cmdline args without -hiveconf options - */ - private static String[] moveHiveconfOptionsToConf(String[] args, - Configuration conf) { - int start = 0; - while (start < args.length) { - if (args[start].endsWith("hiveconf")) { - HiveUtils.processHiveconfOption(conf, args[start + 1]); - start += 2; - } else { - break; - } - } - return Arrays.copyOfRange(args, start, args.length); - } - - /** - * Parse set of Jython scripts from local files - * - * @param interpreter PythonInterpreter to use - * @param paths Jython files to parse - * @return JythonJob - * @throws java.io.IOException - */ - public static JythonJob parseJythonFiles(PythonInterpreter interpreter, - String ... paths) throws IOException { - return parseJythonFiles(interpreter, Arrays.asList(paths)); - } - - /** - * Parse set of Jython scripts from local files - * - * @param interpreter PythonInterpreter to use - * @param paths Jython files to parse - * @return JythonJob - * @throws IOException - */ - public static JythonJob parseJythonFiles(PythonInterpreter interpreter, - List<String> paths) throws IOException { - InputStream[] streams = new InputStream[paths.size()]; - for (int i = 0; i < paths.size(); ++i) { - LOG.info("Reading jython file " + paths.get(i)); - streams[i] = new FileInputStream(paths.get(i)); - } - - JythonJob jythonJob; - try { - jythonJob = parseJythonStreams(interpreter, streams); - } finally { - for (InputStream stream : streams) { - Closeables.close(stream, true); - } - } - return jythonJob; - } - - /** - * Parse scripts from Jython InputStreams - * - * @param interpreter PythonInterpreter - * @param streams InputStreams to parse - * @return JythonJob - */ - public static JythonJob parseJythonStreams(PythonInterpreter interpreter, - InputStream ... streams) { - for (InputStream stream : streams) { - readJythonStream(interpreter, stream); - } - - PyObject pyPrepare = interpreter.get("prepare"); - - JythonJob jythonJob = new JythonJob(); - pyPrepare._jcall(new Object[]{jythonJob}); - - return jythonJob; - } - - /** - * Execute a Jython script - * - * @param interpreter Jython interpreter to use - * @param jythonStream {@link java.io.InputStream} with Jython code - * @throws java.io.IOException - */ - private static void readJythonStream(PythonInterpreter interpreter, - InputStream jythonStream) { - try { - interpreter.execfile(jythonStream); - } finally { - try { - jythonStream.close(); - } catch (IOException e) { - LOG.error("Failed to close jython stream " + jythonStream); - } - } - } - - /** - * Set arbitrary option of unknown type in Configuration - * - * @param conf Configuration - * @param key String key - * @param value Object to set - */ - private static void setOption(Configuration conf, String key, - Object value) { - if (value instanceof Boolean) { - conf.getBoolean(key, (Boolean) value); - } else if (value instanceof Byte || value instanceof Short || - value instanceof Integer) { - conf.setInt(key, ((Number) value).intValue()); - } else if (value instanceof Long) { - conf.setLong(key, (Long) value); - } else if (value instanceof Float || value instanceof Double) { - conf.setFloat(key, ((Number) value).floatValue()); - } else if (value instanceof String) { - conf.set(key, value.toString()); - } else if (value instanceof Class) { - conf.set(key, ((Class) value).getName()); - } else { - throw new IllegalArgumentException( - "Don't know how to handle option key: " + key + - ", value: " + value + ", value type: " + value.getClass()); - } - } - - /** - * Write JythonJob to Configuration - * - * @param jythonJob JythonJob - * @param conf Configuration - * @param interpreter PythonInterpreter - * @return name of Job - */ - public static String writeJythonJobToConf(JythonJob jythonJob, - Configuration conf, PythonInterpreter interpreter) { - checkJob(jythonJob); - - JythonUtils.init(conf, jythonJob.getComputation_name()); - - if (jythonJob.getMessageCombiner() != null) { - MESSAGE_COMBINER_CLASS.set(conf, jythonJob.getMessageCombiner()); - } - - conf.setInt(MIN_WORKERS, jythonJob.getWorkers()); - conf.setInt(MAX_WORKERS, jythonJob.getWorkers()); - - String javaOptions = Joiner.on(' ').join(jythonJob.getJava_options()); - conf.set("mapred.child.java.opts", javaOptions); - - Map<String, Object> options = jythonJob.getGiraph_options(); - for (Map.Entry<String, Object> entry : options.entrySet()) { - setOption(conf, entry.getKey(), entry.getValue()); - } - - setPool(conf, jythonJob); - - initHiveReadersWriters(conf, jythonJob, interpreter); - initGraphTypes(conf, jythonJob, interpreter); - initOutput(conf, jythonJob); - initVertexInputs(conf, jythonJob); - initEdgeInputs(conf, jythonJob); - - String name = jythonJob.getName(); - if (name == null) { - name = jythonJob.getComputation_name(); - } - return name; - } - - /** - * Set the hadoop mapreduce pool - * - * @param conf Configuration - * @param job the job info - */ - private static void setPool(Configuration conf, JythonJob job) { - if (job.getPool() == null) { - if (job.getWorkers() < 50) { - job.setPool("graph.test"); - } else { - job.setPool("graph.production"); - } - } - conf.set("mapred.fairscheduler.pool", job.getPool()); - } - - /** - * Check that the job is valid - * - * @param jythonJob JythonJob - */ - private static void checkJob(JythonJob jythonJob) { - checkNotNull(jythonJob.getComputation_name(), - "computation_name cannot be null"); - checkTypeNotNull(jythonJob.getVertex_id(), GraphType.VERTEX_ID); - checkTypeNotNull(jythonJob.getVertex_value(), GraphType.VERTEX_VALUE); - checkTypeNotNull(jythonJob.getEdge_value(), GraphType.EDGE_VALUE); - checkMessageTypes(jythonJob); - } - - /** - * Check if job has edge inputs - * - * @param jythonJob JythonJob - * @return true if job has edge inputs, false otherwise - */ - private static boolean hasEdgeInputs(JythonJob jythonJob) { - return !jythonJob.getEdge_inputs().isEmpty(); - } - - /** - * Check if job has vertex inputs - * - * @param jythonJob JythonJob - * @return true if job has vertex inputs, false otherwise - */ - private static boolean hasVertexInputs(JythonJob jythonJob) { - return !jythonJob.getVertex_inputs().isEmpty(); - } - - /** - * Check that type is present - * - * @param typeHolder TypeHolder - * @param graphType GraphType - */ - private static void checkTypeNotNull(JythonJob.TypeHolder typeHolder, - GraphType graphType) { - checkNotNull(typeHolder.getType(), graphType + ".type not present"); - } - - /** - * Initialize the job types - * - * @param conf Configuration - * @param jythonJob the job info - * @param interpreter PythonInterpreter to use - */ - private static void initGraphTypes(Configuration conf, - JythonJob jythonJob, PythonInterpreter interpreter) { - GiraphTypes types = new GiraphTypes(); - types.setVertexIdClass(initValueType(conf, GraphType.VERTEX_ID, - jythonJob.getVertex_id().getType(), new JythonVertexIdFactory(), - interpreter)); - types.setVertexValueClass(initValueType(conf, GraphType.VERTEX_VALUE, - jythonJob.getVertex_value().getType(), new JythonVertexValueFactory(), - interpreter)); - types.setEdgeValueClass(initValueType(conf, GraphType.EDGE_VALUE, - jythonJob.getEdge_value().getType(), new JythonEdgeValueFactory(), - interpreter)); - types.setOutgoingMessageValueClass( - initValueType(conf, GraphType.OUTGOING_MESSAGE_VALUE, - jythonJob.getOutgoing_message_value().getType(), - new JythonOutgoingMessageValueFactory(), interpreter)); - types.writeTo(conf); - } - - /** - * Initialize a graph type (IVEMM) - * - * @param conf Configuration - * @param graphType GraphType - * @param jythonOrJavaClass jython or java class given by user - * @param jythonFactory Jactory for making Jython types - * @param interpreter PythonInterpreter - * @return Class for Configuration - */ - private static Class initValueType(Configuration conf, GraphType graphType, - Object jythonOrJavaClass, JythonFactoryBase jythonFactory, - PythonInterpreter interpreter) { - Class<? extends Writable> writableClass = graphType.interfaceClass(); - LanguageAndType langType = processUserType(jythonOrJavaClass, interpreter); - - switch (langType.getLanguage()) { - case JAVA: - GRAPH_TYPE_LANGUAGES.set(conf, graphType, Language.JAVA); - checkImplements(langType, writableClass, interpreter); - return langType.getJavaClass(); - case JYTHON: - GRAPH_TYPE_LANGUAGES.set(conf, graphType, Language.JYTHON); - String jythonClassName = langType.getJythonClassName(); - PyObject jythonClass = interpreter.get(jythonClassName); - if (jythonClass == null) { - throw new IllegalArgumentException("Could not find Jython class " + - jythonClassName + " for parameter " + graphType); - } - PyObject valuePyObj = jythonClass.__call__(); - - // Check if the Jython type implements Writable. If so, just use it - // directly. Otherwise, wrap it in a class that does using pickle. - Object pyWritable = valuePyObj.__tojava__(writableClass); - if (pyWritable.equals(Py.NoConversion)) { - GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS.set(conf, graphType, true); - jythonFactory.useThisFactory(conf, jythonClassName); - return JythonWritableWrapper.class; - } else { - GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS.set(conf, graphType, false); - jythonFactory.useThisFactory(conf, jythonClassName); - return writableClass; - } - default: - throw new IllegalArgumentException("Don't know how to handle " + - LanguageAndType.class.getSimpleName() + " with language " + - langType.getLanguage()); - } - } - - /** - * Check that the incoming / outgoing message value types are present. - * - * @param jythonJob JythonJob - */ - private static void checkMessageTypes(JythonJob jythonJob) { - checkMessageType(jythonJob.getOutgoing_message_value(), - GraphType.OUTGOING_MESSAGE_VALUE, jythonJob); - } - - /** - * Check that given message value type is present. - * - * @param msgTypeHolder Incoming or outgoing message type holder - * @param graphType The graph type - * @param jythonJob JythonJob - */ - private static void checkMessageType(JythonJob.TypeHolder msgTypeHolder, - GraphType graphType, JythonJob jythonJob) { - if (msgTypeHolder.getType() == null) { - Object msgValueType = jythonJob.getMessage_value().getType(); - checkNotNull(msgValueType, graphType + ".type and " + - "message_value.type cannot both be empty"); - msgTypeHolder.setType(msgValueType); - } - } - - /** - * Check that the vertex ID, vertex value, and edge value Hive info is valid. - * - * @param conf Configuration - * @param jythonJob JythonJob - * @param interpreter PythonInterpreter - */ - private static void initHiveReadersWriters(Configuration conf, - JythonJob jythonJob, PythonInterpreter interpreter) { - if (!userTypeIsJavaPrimitiveWritable(jythonJob.getVertex_id())) { - checkTypeWithHive(jythonJob.getVertex_id(), GraphType.VERTEX_ID); - - LanguageAndType idReader = processUserType( - jythonJob.getVertex_id().getHive_reader(), interpreter); - checkImplements(idReader, JythonHiveReader.class, interpreter); - checkArgument(idReader.getLanguage() == Language.JYTHON); - GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME.set(conf, - idReader.getJythonClassName()); - - LanguageAndType idWriter = processUserType( - jythonJob.getVertex_id().getHive_writer(), interpreter); - checkImplements(idWriter, JythonHiveWriter.class, interpreter); - checkArgument(idWriter.getLanguage() == Language.JYTHON); - GiraphHiveConstants.VERTEX_ID_WRITER_JYTHON_NAME.set(conf, - idWriter.getJythonClassName()); - } - - if (hasVertexInputs(jythonJob) && - !userTypeIsJavaPrimitiveWritable(jythonJob.getVertex_value())) { - checkTypeWithHive(jythonJob.getVertex_value(), GraphType.VERTEX_VALUE); - - LanguageAndType valueReader = processUserType( - jythonJob.getVertex_value().getHive_reader(), interpreter); - checkImplements(valueReader, JythonHiveReader.class, interpreter); - checkArgument(valueReader.getLanguage() == Language.JYTHON); - VERTEX_VALUE_READER_JYTHON_NAME.set(conf, - valueReader.getJythonClassName()); - - LanguageAndType valueWriter = processUserType( - jythonJob.getVertex_value().getHive_writer(), interpreter); - checkImplements(valueWriter, JythonHiveWriter.class, interpreter); - checkArgument(valueWriter.getLanguage() == Language.JYTHON); - VERTEX_VALUE_WRITER_JYTHON_NAME.set(conf, - valueWriter.getJythonClassName()); - } - - if (hasEdgeInputs(jythonJob) && - !userTypeIsJavaPrimitiveWritable(jythonJob.getEdge_value())) { - checkNotNull(jythonJob.getEdge_value().getHive_reader(), - "edge_value.hive_reader cannot be null"); - - LanguageAndType edgeReader = processUserType( - jythonJob.getEdge_value().getHive_reader(), interpreter); - checkImplements(edgeReader, JythonHiveReader.class, interpreter); - checkArgument(edgeReader.getLanguage() == Language.JYTHON); - GiraphHiveConstants.EDGE_VALUE_READER_JYTHON_NAME.set(conf, - edgeReader.getJythonClassName()); - } - } - - /** - * Verify Jython class is present and implements the Java type - * - * @param interpreter PythonInterpreter - * @param valueFromUser Jython class or name of class - * @return name of Jython class - */ - private static LanguageAndType processUserType(Object valueFromUser, - PythonInterpreter interpreter) { - // user gave a Class object, should be either Java or Jython class name - if (valueFromUser instanceof Class) { - Class valueClass = (Class) valueFromUser; - String jythonClassName = extractJythonClass(valueClass); - if (jythonClassName != null) { - // Jython class - return processJythonType(jythonClassName, interpreter); - } else { - // Java class - return LanguageAndType.java(valueClass); - } - - // user gave a string, should be either Java or Jython class name - } else if (valueFromUser instanceof String) { - String valueStr = (String) valueFromUser; - Class valueClass; - try { - // Try to find Java class with name - valueClass = Class.forName(valueStr); - return LanguageAndType.java(valueClass); - } catch (ClassNotFoundException e) { - // Java class not found, try to find a Jython one - return processJythonType(valueStr, interpreter); - } - - // user gave a PyClass, process as a Jython class - } else if (valueFromUser instanceof PyClass) { - PyClass userPyClass = (PyClass) valueFromUser; - return processJythonType(userPyClass.__name__, interpreter); - - // user gave a PyType, process as Jython class - } else if (valueFromUser instanceof PyType) { - PyType userPyType = (PyType) valueFromUser; - return processJythonType(userPyType.getName(), interpreter); - - // Otherwise, don't know how to handle this, so error - } else { - throw new IllegalArgumentException("Don't know how to handle " + - valueFromUser + " of class " + valueFromUser.getClass() + - ", needs to be Class or String"); - } - } - - /** - * Check that a type implements a Java interface - * - * @param langType type with langauge - * @param interfaceClass java interface class - * @param interpreter PythonInterpreter - */ - private static void checkImplements(LanguageAndType langType, - Class interfaceClass, PythonInterpreter interpreter) { - switch (langType.getLanguage()) { - case JAVA: - checkArgument(interfaceClass.isAssignableFrom(langType.getJavaClass()), - langType.getJavaClass().getSimpleName() + " needs to implement " + - interfaceClass.getSimpleName()); - break; - case JYTHON: - PyObject pyClass = interpreter.get(langType.getJythonClassName()); - PyObject pyObj = pyClass.__call__(); - Object converted = pyObj.__tojava__(interfaceClass); - checkArgument(!Py.NoConversion.equals(converted), - "Jython class " + langType.getJythonClassName() + - " does not implement " + interfaceClass.getSimpleName() + - " interface"); - break; - default: - throw new IllegalArgumentException("Don't know how to handle " + - "language " + langType.getLanguage()); - } - } - - /** - * Verify Jython class is present and implements specified type - * - * @param jythonName Jython class name - * @param interpreter PythonInterpreter - * @return language and type specification - */ - private static LanguageAndType processJythonType(String jythonName, - PythonInterpreter interpreter) { - PyObject pyClass = interpreter.get(jythonName); - checkNotNull(pyClass, "Jython class " + jythonName + " not found"); - return LanguageAndType.jython(jythonName); - } - - /** - * Check that the given value type is valid - * - * @param typeWithHive value type - * @param graphType GraphType - */ - private static void checkTypeWithHive(JythonJob.TypeWithHive typeWithHive, - GraphType graphType) { - if (typeWithHive.getHive_reader() == null) { - checkNotNull(typeWithHive.getHive_io(), graphType + ".hive_reader and " + - graphType + ".hive_io cannot both be empty"); - typeWithHive.setHive_reader(typeWithHive.getHive_io()); - } - if (typeWithHive.getHive_writer() == null) { - checkNotNull(typeWithHive.getHive_io(), graphType + ".hive_writer and " + - graphType + ".hive_io cannot both be empty"); - typeWithHive.setHive_writer(typeWithHive.getHive_io()); - } - } - - /** - * Create a graph value (IVEMM) reader - * - * @param <T> graph value type - * @param schema {@link com.facebook.hiveio.schema.HiveTableSchema} - * @param columnOption option for column name - * @param conf {@link ImmutableClassesGiraphConfiguration} - * @param graphType GraphType creating a reader for - * @param jythonClassNameOption option for jython class option - * @return {@link org.apache.giraph.hive.values.HiveValueReader} - */ - public static <T extends Writable> HiveValueReader<T> newValueReader( - HiveTableSchema schema, StrConfOption columnOption, - ImmutableClassesGiraphConfiguration conf, GraphType graphType, - StrConfOption jythonClassNameOption) { - HiveValueReader<T> reader; - if (HiveJythonUtils.isPrimitiveWritable(graphType.get(conf))) { - reader = PrimitiveValueReader.create(conf, graphType, columnOption, - schema); - } else if (jythonClassNameOption.contains(conf)) { - reader = JythonColumnReader.create(conf, jythonClassNameOption, - columnOption, schema); - } else { - throw new IllegalArgumentException("Don't know how to read " + graphType + - " of class " + graphType.get(conf) + " which is not primitive and" + - " no " + JythonHiveReader.class.getSimpleName() + " is set"); - } - return reader; - } - - /** - * Create a graph value (IVEMM) writer - * - * @param <T> writable type - * @param schema {@link HiveTableSchema} - * @param columnOption option for column - * @param conf {@link ImmutableClassesGiraphConfiguration} - * @param graphType {@link GraphType} - * @param jythonClassNameOption option for name of jython class - * @return {@link HiveValueWriter} - */ - public static <T extends Writable> HiveValueWriter<T> - newValueWriter(HiveTableSchema schema, StrConfOption columnOption, - ImmutableClassesGiraphConfiguration conf, GraphType graphType, - StrConfOption jythonClassNameOption) { - HiveValueWriter<T> writer; - if (HiveJythonUtils.isPrimitiveWritable(graphType.get(conf))) { - writer = PrimitiveValueWriter.create(conf, columnOption, - schema, graphType); - } else if (jythonClassNameOption.contains(conf)) { - writer = JythonColumnWriter.create(conf, jythonClassNameOption, - columnOption, schema); - } else { - throw new IllegalArgumentException("Don't know how to write " + - graphType + " of class " + graphType.get(conf) + - " which is not primitive and no " + - JythonHiveWriter.class.getSimpleName() + " is set"); - } - return writer; - } - - /** - * Extract Jython class name from a user set proxy Jython class. - * - * For example: - * job.vertex_value_type = FakeLPVertexValue - * Yields: - * org.python.proxies.__main__$FakeLPVertexValue$0 - * This method extracts: - * FakeLPVertexValue - * - * @param klass Jython proxy class - * @return Jython class name - */ - private static String extractJythonClass(Class klass) { - if (!isJythonClass(klass)) { - return null; - } - Iterable<String> parts = Splitter.on('$').split(klass.getSimpleName()); - if (Iterables.size(parts) != 3) { - return null; - } - Iterator<String> partsIter = parts.iterator(); - partsIter.next(); - return partsIter.next(); - } - - /** - * Check if passed in class is a Jython class - * - * @param klass to check - * @return true if Jython class, false otherwise - */ - private static boolean isJythonClass(Class klass) { - return klass.getCanonicalName().startsWith("org.python.proxies"); - } - - /** - * Initialize edge input - * - * @param conf Configuration - * @param jythonJob data to initialize - */ - private static void initEdgeInputs(Configuration conf, JythonJob jythonJob) { - List<JythonJob.EdgeInput> edgeInputs = jythonJob.getEdge_inputs(); - if (!edgeInputs.isEmpty()) { - if (edgeInputs.size() == 1) { - EDGE_INPUT_FORMAT_CLASS.set(conf, HiveEdgeInputFormat.class); - JythonJob.EdgeInput edgeInput = edgeInputs.get(0); - checkEdgeInput(edgeInput); - LOG.info("Setting edge input using: " + edgeInput); - - HIVE_EDGE_INPUT.getDatabaseOpt().set(conf, - jythonJob.getHive_database()); - HIVE_EDGE_INPUT.getTableOpt().set(conf, edgeInput.getTable()); - if (edgeInput.getPartition_filter() != null) { - HIVE_EDGE_INPUT.getPartitionOpt().set(conf, - edgeInput.getPartition_filter()); - } - HIVE_EDGE_INPUT.getClassOpt().set(conf, JythonHiveToEdge.class); - - EDGE_SOURCE_ID_COLUMN.set(conf, edgeInput.getSource_id_column()); - EDGE_TARGET_ID_COLUMN.set(conf, edgeInput.getTarget_id_column()); - if (edgeInput.getValue_column() != null) { - EDGE_VALUE_COLUMN.set(conf, edgeInput.getValue_column()); - } - } else { - EDGE_INPUT_FORMAT_CLASS.set(conf, MultiEdgeInputFormat.class); - throw new IllegalArgumentException( - "Multiple edge inputs not supported yet: " + edgeInputs); - } - } - } - - /** - * Check that the edge input is valid - * - * @param edgeInput data to check - */ - private static void checkEdgeInput(JythonJob.EdgeInput edgeInput) { - checkNotNull(edgeInput.getTable(), "EdgeInput table name needs to be set"); - checkNotNull(edgeInput.getSource_id_column(), - "EdgeInput source ID column needs to be set"); - checkNotNull(edgeInput.getTarget_id_column(), - "EdgeInput target ID column needs to be set"); - } - - /** - * Initialize vertex output info - * - * @param conf Configuration - * @param jythonJob the job info - */ - private static void initVertexInputs(Configuration conf, - JythonJob jythonJob) { - List<JythonJob.VertexInput> vertexInputs = jythonJob.getVertex_inputs(); - if (!vertexInputs.isEmpty()) { - if (vertexInputs.size() == 1) { - VERTEX_INPUT_FORMAT_CLASS.set(conf, HiveVertexInputFormat.class); - JythonJob.VertexInput vertexInput = vertexInputs.get(0); - checkVertexInput(vertexInput); - LOG.info("Setting vertex input using: " + vertexInput); - - HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, - jythonJob.getHive_database()); - HIVE_VERTEX_INPUT.getTableOpt().set(conf, vertexInput.getTable()); - if (vertexInput.getPartition_filter() != null) { - HIVE_VERTEX_INPUT.getPartitionOpt().set(conf, - vertexInput.getPartition_filter()); - } - HIVE_VERTEX_INPUT.getClassOpt().set(conf, JythonHiveToVertex.class); - - JythonHiveToVertex.VERTEX_ID_COLUMN.set(conf, - vertexInput.getId_column()); - if (vertexInput.getValue_column() != null) { - JythonHiveToVertex.VERTEX_VALUE_COLUMN.set(conf, - vertexInput.getValue_column()); - } - } else { - VERTEX_INPUT_FORMAT_CLASS.set(conf, MultiVertexInputFormat.class); - throw new IllegalArgumentException( - "Multiple vertex inputs not supported yet: " + vertexInputs); - } - } - } - - /** - * Check that the vertex input info is valid - * - * @param vertexInput data to check - */ - private static void checkVertexInput(JythonJob.VertexInput vertexInput) { - checkNotNull(vertexInput.getTable(), - "VertexInput table name needs to be set"); - checkNotNull(vertexInput.getId_column(), - "VertexInput ID column needs to be set"); - } - - /** - * Check if the writable is holding a primitive type - * - * @param klass Writable class - * @return true if writable is holding primitive - */ - public static boolean isPrimitiveWritable(Class klass) { - return NullWritable.class.equals(klass) || - BooleanWritable.class.equals(klass) || - ByteWritable.class.equals(klass) || - IntWritable.class.equals(klass) || - LongWritable.class.equals(klass) || - FloatWritable.class.equals(klass) || - DoubleWritable.class.equals(klass); - } - - /** - * Tell whether the user type given is a primitive writable - * - * @param typeHolder TypeHolder - * @return true if type is a Java primitive writable - */ - public static boolean userTypeIsJavaPrimitiveWritable( - JythonJob.TypeHolder typeHolder) { - Object type = typeHolder.getType(); - if (type instanceof Class) { - return isPrimitiveWritable((Class) type); - } else if (type instanceof String) { - try { - Class klass = Class.forName((String) type); - return isPrimitiveWritable(klass); - } catch (ClassNotFoundException e) { - return false; - } - } else { - return false; - } - } - - /** - * Initialize output info - * - * @param conf Configuration - * @param jythonJob the job info - */ - private static void initOutput(Configuration conf, JythonJob jythonJob) { - JythonJob.VertexOutput vertexOutput = jythonJob.getVertex_output(); - if (vertexOutput.getTable() != null) { - LOG.info("Setting vertex output using: " + vertexOutput); - VERTEX_OUTPUT_FORMAT_CLASS.set(conf, HiveVertexOutputFormat.class); - VERTEX_TO_HIVE_CLASS.set(conf, JythonVertexToHive.class); - JythonVertexToHive.VERTEX_ID_COLUMN.set(conf, - vertexOutput.getId_column()); - VERTEX_VALUE_COLUMN.set(conf, vertexOutput.getValue_column()); - HIVE_VERTEX_OUTPUT_DATABASE.set(conf, jythonJob.getHive_database()); - HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile"); - HIVE_VERTEX_OUTPUT_TABLE.set(conf, vertexOutput.getTable()); - if (vertexOutput.getPartition() != null) { - HIVE_VERTEX_OUTPUT_PARTITION.set(conf, - makePartitionString(vertexOutput.getPartition())); - } - } - } - - /** - * Create partition string - * - * @param parts partition pieces - * @return partition string - */ - private static String makePartitionString(Map<String, String> parts) { - return Joiner.on(",").withKeyValueSeparator("=").join(parts); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java deleted file mode 100644 index 18661dc..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.values.HiveValueReader; -import org.apache.giraph.jython.JythonUtils; -import org.apache.hadoop.io.Writable; -import org.python.core.PyObject; - -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * A Vertex ID reader from a single Hive column. - * - * @param <T> Vertex ID - */ -public class JythonColumnReader<T extends Writable> - implements HiveValueReader<T> { - /** The column to read from Hive */ - private JythonReadableColumn column = new JythonReadableColumn(); - /** User's jython column reader */ - private final JythonHiveReader jythonHiveReader; - - /** - * Constructor - * - * @param columnIndex index for column - * @param jythonHiveReader jython column reader - */ - public JythonColumnReader(int columnIndex, - JythonHiveReader jythonHiveReader) { - column.setIndex(columnIndex); - this.jythonHiveReader = jythonHiveReader; - } - - /** - * Create a new value reader - * - * @param <T> value type - * @param conf {@link ImmutableClassesGiraphConfiguration} - * @param jythonClassOption option for jython class name - * @param columnOption {@link StrConfOption} - * @param schema {@link HiveTableSchema} - * @return new {@link JythonColumnReader} - */ - public static <T extends Writable> JythonColumnReader<T> - create(ImmutableClassesGiraphConfiguration conf, - StrConfOption jythonClassOption, StrConfOption columnOption, - HiveTableSchema schema) { - PyObject pyClass = - JythonUtils.getInterpreter().get(jythonClassOption.get(conf)); - JythonHiveReader jythonHiveReader = (JythonHiveReader) - pyClass.__call__().__tojava__(JythonHiveReader.class); - int columnIndex = HiveUtils.columnIndexOrThrow(schema, conf, columnOption); - return new JythonColumnReader<T>(columnIndex, jythonHiveReader); - } - - @Override - public void readFields(T value, HiveReadableRecord record) { - column.setRecord(record); - jythonHiveReader.readFromHive(value, column); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java deleted file mode 100644 index c4da691..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.hive.column.HiveWritableColumn; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.values.HiveValueWriter; -import org.apache.giraph.jython.JythonUtils; -import org.apache.hadoop.io.Writable; -import org.python.core.PyObject; - -import com.facebook.hiveio.record.HiveWritableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * A Hive type writer that uses {@link JythonHiveWriter} - * - * @param <W> graph value type - */ -public class JythonColumnWriter<W extends Writable> - implements HiveValueWriter<W> { - /** The column to read from Hive */ - private HiveWritableColumn column = new HiveWritableColumn(); - /** The user's Jython column writer */ - private final JythonHiveWriter jythonHiveWriter; - - /** - * Constructor - * - * @param columnIndex column index - * @param jythonHiveWriter user's Jython column writer - */ - public JythonColumnWriter(int columnIndex, - JythonHiveWriter jythonHiveWriter) { - column.setIndex(columnIndex); - this.jythonHiveWriter = jythonHiveWriter; - } - - /** - * Create a new vertex ID reader - * - * @param conf {@link ImmutableClassesGiraphConfiguration} - * @param jythonClassOption Option for name of Jython class - * @param columnOption {@link StrConfOption} - * @param schema {@link HiveTableSchema} - * @param <W> Graph value type - * @return new {@link JythonColumnWriter} - */ - public static <W extends Writable> JythonColumnWriter<W> - create(ImmutableClassesGiraphConfiguration conf, - StrConfOption jythonClassOption, StrConfOption columnOption, - HiveTableSchema schema) { - String className = jythonClassOption.get(conf); - PyObject pyClass = JythonUtils.getInterpreter().get(className); - JythonHiveWriter jythonColumnWritable = (JythonHiveWriter) - pyClass.__call__().__tojava__(JythonHiveWriter.class); - int columnIndex = HiveUtils.columnIndexOrThrow(schema, conf, columnOption); - return new JythonColumnWriter<W>(columnIndex, jythonColumnWritable); - } - - @Override - public void write(W value, HiveWritableRecord record) { - column.setRecord(record); - jythonHiveWriter.writeToHive(value, column); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java deleted file mode 100644 index c260fa0..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -/** - * A type that can read and write itself using a Hive column. - */ -public interface JythonHiveIO extends JythonHiveReader, JythonHiveWriter { -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java deleted file mode 100644 index 79416da..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.hadoop.io.Writable; - -/** - * Interface for reading user graph type (IVEMM) from Hive columns. - * - * @param <W> user graph type - */ -public interface JythonHiveReader<W extends Writable> { - /** - * Read data from Hive column - * - * @param value user value to read - * @param column Hive column to read from - */ - void readFromHive(W value, JythonReadableColumn column); -} - http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java deleted file mode 100644 index 1fd5fd0..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.graph.GraphType; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.input.edge.SimpleHiveToEdge; -import org.apache.giraph.hive.values.HiveValueReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import java.util.Iterator; - -/** - * A {@link org.apache.giraph.hive.input.edge.HiveToEdge} that reads each part - * (vertex ID, edge value) using separate readers. - * - * @param <I> Vertex ID - * @param <E> Edge Value - */ -public class JythonHiveToEdge<I extends WritableComparable, E extends Writable> - extends SimpleHiveToEdge<I, E> { - /** Source ID column name in Hive */ - public static final StrConfOption EDGE_SOURCE_ID_COLUMN = - new StrConfOption("hive.input.edge.source.id.column", null, - "Source Vertex ID column"); - /** Target ID column name in Hive */ - public static final StrConfOption EDGE_TARGET_ID_COLUMN = - new StrConfOption("hive.input.edge.target.id.column", null, - "Target Vertex ID column"); - /** Edge Value column name in Hive */ - public static final StrConfOption EDGE_VALUE_COLUMN = - new StrConfOption("hive.input.edge.value.column", null, - "Edge Value column"); - - /** Source ID reader */ - private HiveValueReader<I> sourceIdReader; - /** Target ID reader */ - private HiveValueReader<I> targetIdReader; - /** Edge Value reader */ - private HiveValueReader<E> edgeValueReader; - - @Override - public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { } - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - super.initializeRecords(records); - - HiveTableSchema schema = getTableSchema(); - ImmutableClassesGiraphConfiguration conf = getConf(); - - sourceIdReader = HiveJythonUtils.<I>newValueReader(schema, - EDGE_SOURCE_ID_COLUMN, conf, GraphType.VERTEX_ID, - GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME); - targetIdReader = HiveJythonUtils.<I>newValueReader(schema, - EDGE_TARGET_ID_COLUMN, conf, GraphType.VERTEX_ID, - GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME); - edgeValueReader = HiveJythonUtils.<E>newValueReader(schema, - EDGE_VALUE_COLUMN, conf, GraphType.EDGE_VALUE, - GiraphHiveConstants.EDGE_VALUE_READER_JYTHON_NAME); - } - - @Override - public I getSourceVertexId(HiveReadableRecord record) { - I sourceId = getReusableSourceVertexId(); - sourceIdReader.readFields(sourceId, record); - return sourceId; - } - - @Override - public I getTargetVertexId(HiveReadableRecord record) { - I targetId = getReusableTargetVertexId(); - targetIdReader.readFields(targetId, record); - return targetId; - } - - @Override - public E getEdgeValue(HiveReadableRecord record) { - E edgeValue = getReusableEdgeValue(); - edgeValueReader.readFields(edgeValue, record); - return edgeValue; - } -} - http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java deleted file mode 100644 index b24283e..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.edge.Edge; -import org.apache.giraph.graph.GraphType; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex; -import org.apache.giraph.hive.values.HiveValueReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; -import com.google.common.collect.ImmutableList; - -import java.util.Iterator; - -/** - * A {@link org.apache.giraph.hive.input.vertex.HiveToVertex} that writes each - * part (vertex ID, vertex value, edges) using separate writers. - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public class JythonHiveToVertex<I extends WritableComparable, - V extends Writable, E extends Writable> - extends SimpleHiveToVertex<I, V, E> { - /** Source ID column name in Hive */ - public static final StrConfOption VERTEX_ID_COLUMN = - new StrConfOption("hive.input.vertex.id.column", null, - "Vertex ID column"); - /** Target ID column name in Hive */ - public static final StrConfOption VERTEX_VALUE_COLUMN = - new StrConfOption("hive.input.vertex.value.column", null, - "Vertex Value column"); - - /** Vertex ID reader */ - private HiveValueReader<I> vertexIdReader; - /** Vertex Value reader */ - private HiveValueReader<V> vertexValueReader; - - @Override - public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { } - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - super.initializeRecords(records); - - HiveTableSchema schema = getTableSchema(); - ImmutableClassesGiraphConfiguration<I, V, E> conf = getConf(); - - vertexIdReader = HiveJythonUtils.newValueReader(schema, VERTEX_ID_COLUMN, - conf, GraphType.VERTEX_ID, - GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME); - vertexValueReader = HiveJythonUtils.newValueReader(schema, - VERTEX_VALUE_COLUMN, conf, GraphType.VERTEX_VALUE, - GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME); - } - - @Override - public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) { - return ImmutableList.of(); - } - - @Override - public I getVertexId(HiveReadableRecord record) { - I vertexId = getReusableVertexId(); - vertexIdReader.readFields(vertexId, record); - return vertexId; - } - - @Override - public V getVertexValue(HiveReadableRecord record) { - V vertexValue = getReusableVertexValue(); - vertexValueReader.readFields(vertexValue, record); - return vertexValue; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java deleted file mode 100644 index 4efe434..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.hive.column.HiveWritableColumn; -import org.apache.hadoop.io.Writable; - -/** - * Interface for writing user graph types to Hive columns - * - * @param <W> writable value type - */ -public interface JythonHiveWriter<W extends Writable> { - /** - * Write object's data to Hive - * - * @param value object to write - * @param column Hive column to write to - */ - void writeToHive(W value, HiveWritableColumn column); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java deleted file mode 100644 index 704a459..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.hive.column.HiveReadableColumn; -import org.python.core.Py; -import org.python.core.PyBoolean; -import org.python.core.PyDictionary; -import org.python.core.PyFloat; -import org.python.core.PyInteger; -import org.python.core.PyList; -import org.python.core.PyLong; -import org.python.core.PyObject; -import org.python.core.PyString; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.record.HiveReadableRecord; - -/** - * A single column from a Hive record which reads Jython types - */ -public class JythonReadableColumn { - /** Hive column */ - private final HiveReadableColumn column = new HiveReadableColumn(); - - /** - * Set Hive record - * - * @param record Hive record - */ - public void setRecord(HiveReadableRecord record) { - column.setRecord(record); - } - - /** - * Set column index - * - * @param index column index - */ - public void setIndex(int index) { - column.setIndex(index); - } - - /** - * Get PyBoolean from a boolean column - * - * @return PyBoolean - */ - public PyBoolean getBoolean() { - return new PyBoolean(column.getBoolean()); - } - - /** - * Get PyInteger from a byte, short, or integer column - * - * @return PyInteger - */ - public PyInteger getByte() { - return getInt(); - } - - /** - * Get PyInteger from a byte, short, or integer column - * - * @return PyInteger - */ - public PyInteger getShort() { - return getInt(); - } - - /** - * Get PyInteger from a byte, short, or integer column - * - * @return PyInteger - */ - public PyInteger getInt() { - int value; - if (column.hiveType() == HiveType.BYTE) { - value = column.getByte(); - } else if (column.hiveType() == HiveType.SHORT) { - value = column.getShort(); - } else if (column.hiveType() == HiveType.LONG) { - value = column.getInt(); - } else { - throw new IllegalArgumentException( - "Column is not a byte/short/int, is " + column.hiveType()); - } - return new PyInteger(value); - } - - /** - * Get PyLong as long - * - * @return PyLong - */ - public PyLong getLong() { - return new PyLong(column.getLong()); - } - - /** - * Get PyFloat from a double or float column - * - * @return PyFloat - */ - public PyFloat getDouble() { - return getFloat(); - } - - /** - * Get double or float column as PyFloat - * - * @return PyFloat - */ - public PyFloat getFloat() { - double value; - if (column.hiveType() == HiveType.FLOAT) { - value = column.getFloat(); - } else if (column.hiveType() == HiveType.DOUBLE) { - value = column.getDouble(); - } else { - throw new IllegalArgumentException("Column is not a float/double, is " + - column.hiveType()); - } - return new PyFloat(value); - } - - /** - * Get PyString from a string column - * - * @return PyString - */ - public PyString getString() { - return new PyString(column.getString()); - } - - /** - * Get PyList from a list column - * - * @return PyList - */ - public PyList getList() { - return new PyList(column.getList()); - } - - /** - * Get PyMap from a map column - * - * @return PyMap - */ - public PyDictionary getMap() { - PyDictionary dict = new PyDictionary(); - dict.putAll(column.getMap()); - return dict; - } - - /** - * Get arbitrary PyObject - * - * @return PyObject - */ - public PyObject get() { - return Py.java2py(column.get()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java deleted file mode 100644 index 6d9d030..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.jython; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.graph.GraphType; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.output.SimpleVertexToHive; -import org.apache.giraph.hive.values.HiveValueWriter; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.output.HiveOutputDescription; -import com.facebook.hiveio.record.HiveWritableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * A {@link org.apache.giraph.hive.output.VertexToHive} that writes each part - * (vertex ID, vertex value, edge value) using separate writers. - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public class JythonVertexToHive<I extends WritableComparable, - V extends Writable, E extends Writable> - extends SimpleVertexToHive<I, V, E> { - /** Source ID column name in Hive */ - public static final StrConfOption VERTEX_ID_COLUMN = - new StrConfOption("hive.output.vertex.id.column", null, - "Source Vertex ID column"); - /** Target ID column name in Hive */ - public static final StrConfOption VERTEX_VALUE_COLUMN = - new StrConfOption("hive.output.vertex.value.column", null, - "Target Vertex ID column"); - - /** Vertex ID writer */ - private HiveValueWriter<I> vertexIdWriter; - /** Vertex Value writer */ - private HiveValueWriter<V> vertexValueWriter; - - @Override - public void checkOutput(HiveOutputDescription outputDesc, - HiveTableSchema schema, HiveWritableRecord emptyRecord) { } - - @Override - public void initialize() { - HiveTableSchema schema = getTableSchema(); - ImmutableClassesGiraphConfiguration conf = getConf(); - - vertexIdWriter = HiveJythonUtils.newValueWriter(schema, - VERTEX_ID_COLUMN, conf, GraphType.VERTEX_ID, - GiraphHiveConstants.VERTEX_ID_WRITER_JYTHON_NAME); - vertexValueWriter = HiveJythonUtils.newValueWriter(schema, - VERTEX_VALUE_COLUMN, conf, GraphType.VERTEX_VALUE, - GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME); - } - - @Override - public void fillRecord(Vertex<I, V, E> vertex, HiveWritableRecord record) { - vertexIdWriter.write(vertex.getId(), record); - vertexValueWriter.write(vertex.getValue(), record); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java deleted file mode 100644 index 558d828..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Hive Jython things. - */ -package org.apache.giraph.hive.jython;
