http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
deleted file mode 100644
index 321c945..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-
-import java.util.Iterator;
-
-/**
- * Simple implementation of {@link HiveToVertex} when each vertex is in the one
- * row of the input.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- */
-public abstract class SimpleHiveToVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends AbstractHiveToVertex<I, V, E> {
-  /** Hive records which we are reading from */
-  private Iterator<HiveReadableRecord> records;
-
-  /** Reusable vertex object */
-  private Vertex<I, V, E> reusableVertex;
-
-  /** Reusable vertex id */
-  private I reusableVertexId;
-  /** Reusable vertex value */
-  private V reusableVertexValue;
-  /** Reusable edges */
-  private OutEdges<I, E> reusableOutEdges;
-
-  /**
-   * Read the Vertex's ID from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return Vertex ID
-   */
-  public abstract I getVertexId(HiveReadableRecord record);
-
-  /**
-   * Read the Vertex's Value from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return Vertex Value
-   */
-  public abstract V getVertexValue(HiveReadableRecord record);
-
-  /**
-   * Read Vertex's edges from the HiveRecord given.
-   *
-   * @param record HiveRecord to read from.
-   * @return iterable of edges
-   */
-  public abstract Iterable<Edge<I, E>> getEdges(HiveReadableRecord record);
-
-  @Override
-  public void initializeRecords(Iterator<HiveReadableRecord> records) {
-    this.records = records;
-    reusableVertex = getConf().createVertex();
-    reusableVertexId = getConf().createVertexId();
-    reusableVertexValue = getConf().createVertexValue();
-    reusableOutEdges = getConf().createOutEdges();
-  }
-
-  @Override
-  public boolean hasNext() {
-    return records.hasNext();
-  }
-
-  @Override
-  public Vertex<I, V, E> next() {
-    HiveReadableRecord record = records.next();
-    I id = getVertexId(record);
-    V value = getVertexValue(record);
-    Iterable<Edge<I, E>> edges = getEdges(record);
-    reusableVertex.initialize(id, value, edges);
-    return reusableVertex;
-  }
-
-  protected I getReusableVertexId() {
-    return reusableVertexId;
-  }
-
-  protected V getReusableVertexValue() {
-    return reusableVertexValue;
-  }
-
-  /**
-   * Get reusable OutEdges object
-   *
-   * @param <OE> Type of OutEdges
-   * @return Reusable OutEdges object
-   */
-  protected <OE extends OutEdges<I, E>> OE getReusableOutEdges() {
-    return (OE) reusableOutEdges;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
deleted file mode 100644
index 21cc6c4..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.hive.input.vertex;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Simple implementation of {@link HiveToVertex} when each vertex is in the one
- * row of the input, and there are no edges in vertex input.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- */
-public abstract class SimpleNoEdgesHiveToVertex<I extends WritableComparable,
-    V extends Writable> extends SimpleHiveToVertex<I, V, Writable> {
-  @Override
-  public final Iterable<Edge<I, Writable>> getEdges(HiveReadableRecord record) 
{
-    return ImmutableList.of();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
deleted file mode 100644
index cae3e01..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntDoubleDoubleVertex.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.hive.common.HiveParsing;
-import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-
-import com.facebook.hiveio.common.HiveType;
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.input.parser.Records;
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-/**
- * Simple HiveToVertex that reads vertices with integer IDs, Double vertex
- * values, and edges with Double values.
- */
-public class HiveIntDoubleDoubleVertex extends SimpleHiveToVertex<IntWritable,
-    DoubleWritable, DoubleWritable> {
-  @Override public void checkInput(HiveInputDescription inputDesc,
-      HiveTableSchema schema) {
-    Records.verifyType(0, HiveType.INT, schema);
-    Records.verifyType(1, HiveType.DOUBLE, schema);
-    Records.verifyType(2, HiveType.MAP, schema);
-  }
-
-  @Override public Iterable<Edge<IntWritable, DoubleWritable>> getEdges(
-      HiveReadableRecord record) {
-    return HiveParsing.parseIntDoubleEdges(record, 2);
-  }
-
-  @Override
-  public IntWritable getVertexId(HiveReadableRecord record) {
-    return HiveParsing.parseIntID(record, 0, getReusableVertexId());
-  }
-
-  @Override
-  public DoubleWritable getVertexValue(HiveReadableRecord record) {
-    return HiveParsing.parseDoubleWritable(record, 1, 
getReusableVertexValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
deleted file mode 100644
index 526a9a7..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex.examples;
-
-import com.facebook.hiveio.common.HiveType;
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.input.parser.Records;
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.hive.common.HiveParsing;
-import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-/**
- * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
- * and edges with no values.
- */
-public class HiveIntIntNullVertex
-    extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> {
-  @Override public void checkInput(HiveInputDescription inputDesc,
-      HiveTableSchema schema) {
-    Records.verifyType(0, HiveType.INT, schema);
-    Records.verifyType(1, HiveType.LIST, schema);
-  }
-
-  @Override
-  public Iterable<Edge<IntWritable, NullWritable>> getEdges(
-      HiveReadableRecord record) {
-    return HiveParsing.parseIntNullEdges(record, 1);
-  }
-
-  @Override
-  public IntWritable getVertexId(HiveReadableRecord record) {
-    return HiveParsing.parseIntID(record, 0, getReusableVertexId());
-  }
-
-  @Override
-  public IntWritable getVertexValue(HiveReadableRecord record) {
-    return getReusableVertexValue();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
deleted file mode 100644
index 9148c57..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntNullNullVertex.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.input.vertex.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.hive.common.HiveParsing;
-import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.facebook.hiveio.common.HiveType;
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.input.parser.Records;
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-/**
- * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
- * and edges with no values.
- */
-public class HiveIntNullNullVertex
-    extends SimpleHiveToVertex<IntWritable, NullWritable, NullWritable> {
-  @Override public void checkInput(HiveInputDescription inputDesc,
-      HiveTableSchema schema) {
-    Records.verifyType(0, HiveType.INT, schema);
-    Records.verifyType(1, HiveType.LIST, schema);
-  }
-
-  @Override
-  public Iterable<Edge<IntWritable, NullWritable>> getEdges(
-      HiveReadableRecord record) {
-    return HiveParsing.parseIntNullEdges(record, 1);
-  }
-
-  @Override
-  public IntWritable getVertexId(HiveReadableRecord record) {
-    return HiveParsing.parseIntID(record, 0, getReusableVertexId());
-  }
-
-  @Override
-  public NullWritable getVertexValue(HiveReadableRecord record) {
-    return NullWritable.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
deleted file mode 100644
index ea15393..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Hive input vertex examples.
- */
-package org.apache.giraph.hive.input.vertex.examples;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
deleted file mode 100644
index 9027962..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Hive vertex input related things.
- */
-package org.apache.giraph.hive.input.vertex;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java
deleted file mode 100644
index 3d7afec..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonRunner.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.graph.Language;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.jython.JythonJob;
-import org.apache.giraph.scripting.DeployType;
-import org.apache.giraph.scripting.ScriptLoader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-import org.python.util.PythonInterpreter;
-
-import com.facebook.hiveio.HiveIO;
-
-import java.util.Arrays;
-
-import static org.apache.giraph.hive.jython.HiveJythonUtils.parseJythonFiles;
-import static org.apache.giraph.utils.DistributedCacheUtils.copyAndAdd;
-
-/**
- * Runner for jobs written in Jython
- */
-public class HiveJythonRunner implements Tool {
-  /** Logger */
-  private static final Logger LOG = Logger.getLogger(HiveJythonRunner.class);
-  /** Configuration */
-  private static HiveConf CONF = new HiveConf();
-
-  @Override public int run(String[] args) throws Exception {
-    args = HiveJythonUtils.processArgs(args, CONF);
-    LOG.info("Processed hive options now have args: " + Arrays.toString(args));
-
-    HiveIO.init(CONF, false);
-
-    PythonInterpreter interpreter = new PythonInterpreter();
-
-    JythonJob jythonJob = parseJythonFiles(interpreter, args);
-
-    logOptions();
-
-    for (String arg : args) {
-      Path remoteScriptPath = copyAndAdd(new Path(arg), CONF);
-      ScriptLoader.addScriptToLoad(CONF, remoteScriptPath.toString(),
-          DeployType.DISTRIBUTED_CACHE, Language.JYTHON);
-    }
-
-    String name = HiveJythonUtils.writeJythonJobToConf(jythonJob, CONF,
-       interpreter);
-
-    GiraphJob job = new GiraphJob(CONF, name);
-    return job.run(true) ? 0 : -1;
-  }
-
-  /**
-   * Log options used
-   */
-  private static void logOptions() {
-    StringBuilder sb = new StringBuilder(100);
-    appendEnvVars(sb, "JAVA_HOME", "MAPRED_POOL_NAME");
-    appendEnvVars(sb, "HADOOP_HOME", "HIVE_HOME");
-    LOG.info("Environment:\n" + sb);
-  }
-
-  /**
-   * Append environment variables to StringBuilder
-   *
-   * @param sb StringBuilder
-   * @param names vararg of env keys
-   */
-  private static void appendEnvVars(StringBuilder sb, String ... names) {
-    for (String name : names) {
-      sb.append(name).append("=").append(System.getenv(name)).append("\n");
-    }
-  }
-
-  /**
-   * Set the static configuration stored
-   *
-   * @param conf Configuration
-   */
-  public static void setStaticConf(Configuration conf) {
-    if (conf instanceof HiveConf) {
-      HiveJythonRunner.CONF = (HiveConf) conf;
-    } else {
-      HiveJythonRunner.CONF = new HiveConf(conf, HiveJythonRunner.class);
-    }
-  }
-
-  @Override public void setConf(Configuration conf) {
-    setStaticConf(conf);
-  }
-
-  @Override public Configuration getConf() {
-    return CONF;
-  }
-
-  /**
-   * Entry point
-   *
-   * @param args command line args
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    System.exit(ToolRunner.run(new HiveJythonRunner(), args));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
deleted file mode 100644
index ba3e8cc..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ /dev/null
@@ -1,910 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES;
-import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS;
-import static 
org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME;
-import static 
org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME;
-import static 
org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_SOURCE_ID_COLUMN;
-import static 
org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_TARGET_ID_COLUMN;
-import static org.apache.giraph.hive.jython.JythonHiveToEdge.EDGE_VALUE_COLUMN;
-import static 
org.apache.giraph.hive.jython.JythonVertexToHive.VERTEX_VALUE_COLUMN;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.GiraphTypes;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.graph.GraphType;
-import org.apache.giraph.graph.Language;
-import org.apache.giraph.hive.common.GiraphHiveConstants;
-import org.apache.giraph.hive.common.HiveUtils;
-import org.apache.giraph.hive.common.LanguageAndType;
-import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
-import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
-import org.apache.giraph.hive.output.HiveVertexOutputFormat;
-import org.apache.giraph.hive.primitives.PrimitiveValueReader;
-import org.apache.giraph.hive.primitives.PrimitiveValueWriter;
-import org.apache.giraph.hive.values.HiveValueReader;
-import org.apache.giraph.hive.values.HiveValueWriter;
-import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat;
-import org.apache.giraph.io.formats.multi.MultiVertexInputFormat;
-import org.apache.giraph.jython.JythonJob;
-import org.apache.giraph.jython.JythonUtils;
-import org.apache.giraph.jython.factories.JythonEdgeValueFactory;
-import org.apache.giraph.jython.factories.JythonFactoryBase;
-import org.apache.giraph.jython.factories.JythonOutgoingMessageValueFactory;
-import org.apache.giraph.jython.factories.JythonVertexIdFactory;
-import org.apache.giraph.jython.factories.JythonVertexValueFactory;
-import org.apache.giraph.jython.wrappers.JythonWritableWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-import org.python.core.Py;
-import org.python.core.PyClass;
-import org.python.core.PyObject;
-import org.python.core.PyType;
-import org.python.util.PythonInterpreter;
-
-import com.facebook.hiveio.schema.HiveTableSchema;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.io.Closeables;
-
-/**
- * Plugin to {@link HiveJythonRunner} to use Hive.
- */
-public class HiveJythonUtils {
-  /** Logger */
-  private static final Logger LOG = Logger.getLogger(HiveJythonUtils.class);
-
-  /** Don't construct */
-  private HiveJythonUtils() { }
-
-  /**
-   * Process command line arguments
-   *
-   * @param args cmdline args
-   * @param conf {@link Configuration}
-   * @return remaining cmdline args to process
-   */
-  public static String[] processArgs(String[] args, Configuration conf) {
-    HiveUtils.addHadoopClasspathToTmpJars(conf);
-    HiveUtils.addHiveSiteXmlToTmpFiles(conf);
-    HiveUtils.addHiveSiteCustomXmlToTmpFiles(conf);
-    return moveHiveconfOptionsToConf(args, conf);
-  }
-
-  /**
-   * Remove -hiveconf options from cmdline
-   *
-   * @param args cmdline args
-   * @param conf Configuration
-   * @return cmdline args without -hiveconf options
-   */
-  private static String[] moveHiveconfOptionsToConf(String[] args,
-      Configuration conf) {
-    int start = 0;
-    while (start < args.length) {
-      if (args[start].endsWith("hiveconf")) {
-        HiveUtils.processHiveconfOption(conf, args[start + 1]);
-        start += 2;
-      } else {
-        break;
-      }
-    }
-    return Arrays.copyOfRange(args, start, args.length);
-  }
-
-  /**
-   * Parse set of Jython scripts from local files
-   *
-   * @param interpreter PythonInterpreter to use
-   * @param paths Jython files to parse
-   * @return JythonJob
-   * @throws java.io.IOException
-   */
-  public static JythonJob parseJythonFiles(PythonInterpreter interpreter,
-      String ... paths) throws IOException {
-    return parseJythonFiles(interpreter, Arrays.asList(paths));
-  }
-
-  /**
-   * Parse set of Jython scripts from local files
-   *
-   * @param interpreter PythonInterpreter to use
-   * @param paths Jython files to parse
-   * @return JythonJob
-   * @throws IOException
-   */
-  public static JythonJob parseJythonFiles(PythonInterpreter interpreter,
-      List<String> paths) throws IOException {
-    InputStream[] streams = new InputStream[paths.size()];
-    for (int i = 0; i < paths.size(); ++i) {
-      LOG.info("Reading jython file " + paths.get(i));
-      streams[i] = new FileInputStream(paths.get(i));
-    }
-
-    JythonJob jythonJob;
-    try {
-      jythonJob = parseJythonStreams(interpreter, streams);
-    } finally {
-      for (InputStream stream : streams) {
-        Closeables.close(stream, true);
-      }
-    }
-    return jythonJob;
-  }
-
-  /**
-   * Parse scripts from Jython InputStreams
-   *
-   * @param interpreter PythonInterpreter
-   * @param streams InputStreams to parse
-   * @return JythonJob
-   */
-  public static JythonJob parseJythonStreams(PythonInterpreter interpreter,
-      InputStream ... streams) {
-    for (InputStream stream : streams) {
-      readJythonStream(interpreter, stream);
-    }
-
-    PyObject pyPrepare = interpreter.get("prepare");
-
-    JythonJob jythonJob = new JythonJob();
-    pyPrepare._jcall(new Object[]{jythonJob});
-
-    return jythonJob;
-  }
-
-  /**
-   * Execute a Jython script
-   *
-   * @param interpreter Jython interpreter to use
-   * @param jythonStream {@link java.io.InputStream} with Jython code
-   * @throws java.io.IOException
-   */
-  private static void readJythonStream(PythonInterpreter interpreter,
-      InputStream jythonStream) {
-    try {
-      interpreter.execfile(jythonStream);
-    } finally {
-      try {
-        jythonStream.close();
-      } catch (IOException e) {
-        LOG.error("Failed to close jython stream " + jythonStream);
-      }
-    }
-  }
-
-  /**
-   * Set arbitrary option of unknown type in Configuration
-   *
-   * @param conf Configuration
-   * @param key String key
-   * @param value Object to set
-   */
-  private static void setOption(Configuration conf, String key,
-      Object value) {
-    if (value instanceof Boolean) {
-      conf.getBoolean(key, (Boolean) value);
-    } else if (value instanceof Byte || value instanceof Short ||
-        value instanceof Integer) {
-      conf.setInt(key, ((Number) value).intValue());
-    } else if (value instanceof Long) {
-      conf.setLong(key, (Long) value);
-    } else if (value instanceof Float || value instanceof Double) {
-      conf.setFloat(key, ((Number) value).floatValue());
-    } else if (value instanceof String) {
-      conf.set(key, value.toString());
-    } else if (value instanceof Class) {
-      conf.set(key, ((Class) value).getName());
-    } else {
-      throw new IllegalArgumentException(
-          "Don't know how to handle option key: " + key +
-          ", value: " + value + ", value type: " + value.getClass());
-    }
-  }
-
-  /**
-   * Write JythonJob to Configuration
-   *
-   * @param jythonJob JythonJob
-   * @param conf Configuration
-   * @param interpreter PythonInterpreter
-   * @return name of Job
-   */
-  public static String writeJythonJobToConf(JythonJob jythonJob,
-      Configuration conf, PythonInterpreter interpreter) {
-    checkJob(jythonJob);
-
-    JythonUtils.init(conf, jythonJob.getComputation_name());
-
-    if (jythonJob.getMessageCombiner() != null) {
-      MESSAGE_COMBINER_CLASS.set(conf, jythonJob.getMessageCombiner());
-    }
-
-    conf.setInt(MIN_WORKERS, jythonJob.getWorkers());
-    conf.setInt(MAX_WORKERS, jythonJob.getWorkers());
-
-    String javaOptions = Joiner.on(' ').join(jythonJob.getJava_options());
-    conf.set("mapred.child.java.opts", javaOptions);
-
-    Map<String, Object> options = jythonJob.getGiraph_options();
-    for (Map.Entry<String, Object> entry : options.entrySet()) {
-      setOption(conf, entry.getKey(), entry.getValue());
-    }
-
-    setPool(conf, jythonJob);
-
-    initHiveReadersWriters(conf, jythonJob, interpreter);
-    initGraphTypes(conf, jythonJob, interpreter);
-    initOutput(conf, jythonJob);
-    initVertexInputs(conf, jythonJob);
-    initEdgeInputs(conf, jythonJob);
-
-    String name = jythonJob.getName();
-    if (name == null) {
-      name = jythonJob.getComputation_name();
-    }
-    return name;
-  }
-
-  /**
-   * Set the hadoop mapreduce pool
-   *
-   * @param conf Configuration
-   * @param job the job info
-   */
-  private static void setPool(Configuration conf, JythonJob job) {
-    if (job.getPool() == null) {
-      if (job.getWorkers() < 50) {
-        job.setPool("graph.test");
-      } else {
-        job.setPool("graph.production");
-      }
-    }
-    conf.set("mapred.fairscheduler.pool", job.getPool());
-  }
-
-  /**
-   * Check that the job is valid
-   *
-   * @param jythonJob JythonJob
-   */
-  private static void checkJob(JythonJob jythonJob) {
-    checkNotNull(jythonJob.getComputation_name(),
-        "computation_name cannot be null");
-    checkTypeNotNull(jythonJob.getVertex_id(), GraphType.VERTEX_ID);
-    checkTypeNotNull(jythonJob.getVertex_value(), GraphType.VERTEX_VALUE);
-    checkTypeNotNull(jythonJob.getEdge_value(), GraphType.EDGE_VALUE);
-    checkMessageTypes(jythonJob);
-  }
-
-  /**
-   * Check if job has edge inputs
-   *
-   * @param jythonJob JythonJob
-   * @return true if job has edge inputs, false otherwise
-   */
-  private static boolean hasEdgeInputs(JythonJob jythonJob) {
-    return !jythonJob.getEdge_inputs().isEmpty();
-  }
-
-  /**
-   * Check if job has vertex inputs
-   *
-   * @param jythonJob JythonJob
-   * @return true if job has vertex inputs, false otherwise
-   */
-  private static boolean hasVertexInputs(JythonJob jythonJob) {
-    return !jythonJob.getVertex_inputs().isEmpty();
-  }
-
-  /**
-   * Check that type is present
-   *
-   * @param typeHolder TypeHolder
-   * @param graphType GraphType
-   */
-  private static void checkTypeNotNull(JythonJob.TypeHolder typeHolder,
-      GraphType graphType) {
-    checkNotNull(typeHolder.getType(), graphType + ".type not present");
-  }
-
-  /**
-   * Initialize the job types
-   *
-   * @param conf Configuration
-   * @param jythonJob the job info
-   * @param interpreter PythonInterpreter to use
-   */
-  private static void initGraphTypes(Configuration conf,
-      JythonJob jythonJob, PythonInterpreter interpreter) {
-    GiraphTypes types = new GiraphTypes();
-    types.setVertexIdClass(initValueType(conf, GraphType.VERTEX_ID,
-        jythonJob.getVertex_id().getType(), new JythonVertexIdFactory(),
-        interpreter));
-    types.setVertexValueClass(initValueType(conf, GraphType.VERTEX_VALUE,
-        jythonJob.getVertex_value().getType(), new JythonVertexValueFactory(),
-        interpreter));
-    types.setEdgeValueClass(initValueType(conf, GraphType.EDGE_VALUE,
-        jythonJob.getEdge_value().getType(), new JythonEdgeValueFactory(),
-        interpreter));
-    types.setOutgoingMessageValueClass(
-        initValueType(conf, GraphType.OUTGOING_MESSAGE_VALUE,
-            jythonJob.getOutgoing_message_value().getType(),
-            new JythonOutgoingMessageValueFactory(), interpreter));
-    types.writeTo(conf);
-  }
-
-  /**
-   * Initialize a graph type (IVEMM)
-   *
-   * @param conf Configuration
-   * @param graphType GraphType
-   * @param jythonOrJavaClass jython or java class given by user
-   * @param jythonFactory Jactory for making Jython types
-   * @param interpreter PythonInterpreter
-   * @return Class for Configuration
-   */
-  private static Class initValueType(Configuration conf, GraphType graphType,
-      Object jythonOrJavaClass, JythonFactoryBase jythonFactory,
-      PythonInterpreter interpreter) {
-    Class<? extends Writable> writableClass = graphType.interfaceClass();
-    LanguageAndType langType = processUserType(jythonOrJavaClass, interpreter);
-
-    switch (langType.getLanguage()) {
-    case JAVA:
-      GRAPH_TYPE_LANGUAGES.set(conf, graphType, Language.JAVA);
-      checkImplements(langType, writableClass, interpreter);
-      return langType.getJavaClass();
-    case JYTHON:
-      GRAPH_TYPE_LANGUAGES.set(conf, graphType, Language.JYTHON);
-      String jythonClassName = langType.getJythonClassName();
-      PyObject jythonClass = interpreter.get(jythonClassName);
-      if (jythonClass == null) {
-        throw new IllegalArgumentException("Could not find Jython class " +
-            jythonClassName + " for parameter " + graphType);
-      }
-      PyObject valuePyObj = jythonClass.__call__();
-
-      // Check if the Jython type implements Writable. If so, just use it
-      // directly. Otherwise, wrap it in a class that does using pickle.
-      Object pyWritable = valuePyObj.__tojava__(writableClass);
-      if (pyWritable.equals(Py.NoConversion)) {
-        GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS.set(conf, graphType, true);
-        jythonFactory.useThisFactory(conf, jythonClassName);
-        return JythonWritableWrapper.class;
-      } else {
-        GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS.set(conf, graphType, false);
-        jythonFactory.useThisFactory(conf, jythonClassName);
-        return writableClass;
-      }
-    default:
-      throw new IllegalArgumentException("Don't know how to handle " +
-          LanguageAndType.class.getSimpleName() + " with language " +
-          langType.getLanguage());
-    }
-  }
-
-  /**
-   * Check that the incoming / outgoing message value types are present.
-   *
-   * @param jythonJob JythonJob
-   */
-  private static void checkMessageTypes(JythonJob jythonJob) {
-    checkMessageType(jythonJob.getOutgoing_message_value(),
-        GraphType.OUTGOING_MESSAGE_VALUE, jythonJob);
-  }
-
-  /**
-   * Check that given message value type is present.
-   *
-   * @param msgTypeHolder Incoming or outgoing message type holder
-   * @param graphType The graph type
-   * @param jythonJob JythonJob
-   */
-  private static void checkMessageType(JythonJob.TypeHolder msgTypeHolder,
-      GraphType graphType, JythonJob jythonJob) {
-    if (msgTypeHolder.getType() == null) {
-      Object msgValueType = jythonJob.getMessage_value().getType();
-      checkNotNull(msgValueType, graphType + ".type and " +
-          "message_value.type cannot both be empty");
-      msgTypeHolder.setType(msgValueType);
-    }
-  }
-
-  /**
-   * Check that the vertex ID, vertex value, and edge value Hive info is valid.
-   *
-   * @param conf Configuration
-   * @param jythonJob JythonJob
-   * @param interpreter PythonInterpreter
-   */
-  private static void initHiveReadersWriters(Configuration conf,
-      JythonJob jythonJob, PythonInterpreter interpreter) {
-    if (!userTypeIsJavaPrimitiveWritable(jythonJob.getVertex_id())) {
-      checkTypeWithHive(jythonJob.getVertex_id(), GraphType.VERTEX_ID);
-
-      LanguageAndType idReader = processUserType(
-          jythonJob.getVertex_id().getHive_reader(), interpreter);
-      checkImplements(idReader, JythonHiveReader.class, interpreter);
-      checkArgument(idReader.getLanguage() == Language.JYTHON);
-      GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME.set(conf,
-          idReader.getJythonClassName());
-
-      LanguageAndType idWriter = processUserType(
-          jythonJob.getVertex_id().getHive_writer(), interpreter);
-      checkImplements(idWriter, JythonHiveWriter.class, interpreter);
-      checkArgument(idWriter.getLanguage() == Language.JYTHON);
-      GiraphHiveConstants.VERTEX_ID_WRITER_JYTHON_NAME.set(conf,
-          idWriter.getJythonClassName());
-    }
-
-    if (hasVertexInputs(jythonJob) &&
-        !userTypeIsJavaPrimitiveWritable(jythonJob.getVertex_value())) {
-      checkTypeWithHive(jythonJob.getVertex_value(), GraphType.VERTEX_VALUE);
-
-      LanguageAndType valueReader = processUserType(
-          jythonJob.getVertex_value().getHive_reader(), interpreter);
-      checkImplements(valueReader, JythonHiveReader.class, interpreter);
-      checkArgument(valueReader.getLanguage() == Language.JYTHON);
-      VERTEX_VALUE_READER_JYTHON_NAME.set(conf,
-          valueReader.getJythonClassName());
-
-      LanguageAndType valueWriter = processUserType(
-          jythonJob.getVertex_value().getHive_writer(), interpreter);
-      checkImplements(valueWriter, JythonHiveWriter.class, interpreter);
-      checkArgument(valueWriter.getLanguage() == Language.JYTHON);
-      VERTEX_VALUE_WRITER_JYTHON_NAME.set(conf,
-          valueWriter.getJythonClassName());
-    }
-
-    if (hasEdgeInputs(jythonJob) &&
-        !userTypeIsJavaPrimitiveWritable(jythonJob.getEdge_value())) {
-      checkNotNull(jythonJob.getEdge_value().getHive_reader(),
-          "edge_value.hive_reader cannot be null");
-
-      LanguageAndType edgeReader = processUserType(
-          jythonJob.getEdge_value().getHive_reader(), interpreter);
-      checkImplements(edgeReader, JythonHiveReader.class, interpreter);
-      checkArgument(edgeReader.getLanguage() == Language.JYTHON);
-      GiraphHiveConstants.EDGE_VALUE_READER_JYTHON_NAME.set(conf,
-          edgeReader.getJythonClassName());
-    }
-  }
-
-  /**
-   * Verify Jython class is present and implements the Java type
-   *
-   * @param interpreter PythonInterpreter
-   * @param valueFromUser Jython class or name of class
-   * @return name of Jython class
-   */
-  private static LanguageAndType processUserType(Object valueFromUser,
-      PythonInterpreter interpreter) {
-    // user gave a Class object, should be either Java or Jython class name
-    if (valueFromUser instanceof Class) {
-      Class valueClass = (Class) valueFromUser;
-      String jythonClassName = extractJythonClass(valueClass);
-      if (jythonClassName != null) {
-        // Jython class
-        return processJythonType(jythonClassName, interpreter);
-      } else {
-        // Java class
-        return LanguageAndType.java(valueClass);
-      }
-
-      // user gave a string, should be either Java or Jython class name
-    } else if (valueFromUser instanceof String) {
-      String valueStr = (String) valueFromUser;
-      Class valueClass;
-      try {
-        // Try to find Java class with name
-        valueClass = Class.forName(valueStr);
-        return LanguageAndType.java(valueClass);
-      } catch (ClassNotFoundException e) {
-        // Java class not found, try to find a Jython one
-        return processJythonType(valueStr, interpreter);
-      }
-
-      // user gave a PyClass, process as a Jython class
-    } else if (valueFromUser instanceof PyClass) {
-      PyClass userPyClass = (PyClass) valueFromUser;
-      return processJythonType(userPyClass.__name__, interpreter);
-
-      // user gave a PyType, process as Jython class
-    } else if (valueFromUser instanceof PyType) {
-      PyType userPyType = (PyType) valueFromUser;
-      return processJythonType(userPyType.getName(), interpreter);
-
-      // Otherwise, don't know how to handle this, so error
-    } else {
-      throw new IllegalArgumentException("Don't know how to handle " +
-          valueFromUser + " of class " + valueFromUser.getClass() +
-          ", needs to be Class or String");
-    }
-  }
-
-  /**
-   * Check that a type implements a Java interface
-   *
-   * @param langType type with langauge
-   * @param interfaceClass java interface class
-   * @param interpreter PythonInterpreter
-   */
-  private static void checkImplements(LanguageAndType langType,
-      Class interfaceClass, PythonInterpreter interpreter) {
-    switch (langType.getLanguage()) {
-    case JAVA:
-      checkArgument(interfaceClass.isAssignableFrom(langType.getJavaClass()),
-          langType.getJavaClass().getSimpleName() + " needs to implement " +
-          interfaceClass.getSimpleName());
-      break;
-    case JYTHON:
-      PyObject pyClass = interpreter.get(langType.getJythonClassName());
-      PyObject pyObj = pyClass.__call__();
-      Object converted = pyObj.__tojava__(interfaceClass);
-      checkArgument(!Py.NoConversion.equals(converted),
-         "Jython class " + langType.getJythonClassName() +
-         " does not implement " + interfaceClass.getSimpleName() +
-         " interface");
-      break;
-    default:
-      throw new IllegalArgumentException("Don't know how to handle " +
-          "language " + langType.getLanguage());
-    }
-  }
-
-  /**
-   * Verify Jython class is present and implements specified type
-   *
-   * @param jythonName Jython class name
-   * @param interpreter PythonInterpreter
-   * @return language and type specification
-   */
-  private static LanguageAndType processJythonType(String jythonName,
-      PythonInterpreter interpreter) {
-    PyObject pyClass = interpreter.get(jythonName);
-    checkNotNull(pyClass, "Jython class " + jythonName + " not found");
-    return LanguageAndType.jython(jythonName);
-  }
-
-  /**
-   * Check that the given value type is valid
-   *
-   * @param typeWithHive value type
-   * @param graphType GraphType
-   */
-  private static void checkTypeWithHive(JythonJob.TypeWithHive typeWithHive,
-      GraphType graphType) {
-    if (typeWithHive.getHive_reader() == null) {
-      checkNotNull(typeWithHive.getHive_io(), graphType + ".hive_reader and " +
-          graphType + ".hive_io cannot both be empty");
-      typeWithHive.setHive_reader(typeWithHive.getHive_io());
-    }
-    if (typeWithHive.getHive_writer() == null) {
-      checkNotNull(typeWithHive.getHive_io(), graphType + ".hive_writer and " +
-          graphType + ".hive_io cannot both be empty");
-      typeWithHive.setHive_writer(typeWithHive.getHive_io());
-    }
-  }
-
-  /**
-   * Create a graph value (IVEMM) reader
-   *
-   * @param <T> graph value type
-   * @param schema {@link com.facebook.hiveio.schema.HiveTableSchema}
-   * @param columnOption option for column name
-   * @param conf {@link ImmutableClassesGiraphConfiguration}
-   * @param graphType GraphType creating a reader for
-   * @param jythonClassNameOption option for jython class option
-   * @return {@link org.apache.giraph.hive.values.HiveValueReader}
-   */
-  public static <T extends Writable> HiveValueReader<T> newValueReader(
-      HiveTableSchema schema, StrConfOption columnOption,
-      ImmutableClassesGiraphConfiguration conf, GraphType graphType,
-      StrConfOption jythonClassNameOption) {
-    HiveValueReader<T> reader;
-    if (HiveJythonUtils.isPrimitiveWritable(graphType.get(conf))) {
-      reader = PrimitiveValueReader.create(conf, graphType, columnOption,
-          schema);
-    } else if (jythonClassNameOption.contains(conf)) {
-      reader = JythonColumnReader.create(conf, jythonClassNameOption,
-          columnOption, schema);
-    } else {
-      throw new IllegalArgumentException("Don't know how to read " + graphType 
+
-          " of class " + graphType.get(conf) + " which is not primitive and" +
-          " no " + JythonHiveReader.class.getSimpleName() + " is set");
-    }
-    return reader;
-  }
-
-  /**
-   * Create a graph value (IVEMM) writer
-   *
-   * @param <T> writable type
-   * @param schema {@link HiveTableSchema}
-   * @param columnOption option for column
-   * @param conf {@link ImmutableClassesGiraphConfiguration}
-   * @param graphType {@link GraphType}
-   * @param jythonClassNameOption option for name of jython class
-   * @return {@link HiveValueWriter}
-   */
-  public static <T extends Writable> HiveValueWriter<T>
-  newValueWriter(HiveTableSchema schema, StrConfOption columnOption,
-      ImmutableClassesGiraphConfiguration conf, GraphType graphType,
-      StrConfOption jythonClassNameOption) {
-    HiveValueWriter<T> writer;
-    if (HiveJythonUtils.isPrimitiveWritable(graphType.get(conf))) {
-      writer = PrimitiveValueWriter.create(conf, columnOption,
-          schema, graphType);
-    } else if (jythonClassNameOption.contains(conf)) {
-      writer = JythonColumnWriter.create(conf, jythonClassNameOption,
-          columnOption, schema);
-    } else {
-      throw new IllegalArgumentException("Don't know how to write " +
-          graphType +  " of class " + graphType.get(conf) +
-          " which is not primitive and no " +
-          JythonHiveWriter.class.getSimpleName() + " is set");
-    }
-    return writer;
-  }
-
-  /**
-   * Extract Jython class name from a user set proxy Jython class.
-   *
-   * For example:
-   *    job.vertex_value_type = FakeLPVertexValue
-   * Yields:
-   *    org.python.proxies.__main__$FakeLPVertexValue$0
-   * This method extracts:
-   *    FakeLPVertexValue
-   *
-   * @param klass Jython proxy class
-   * @return Jython class name
-   */
-  private static String extractJythonClass(Class klass) {
-    if (!isJythonClass(klass)) {
-      return null;
-    }
-    Iterable<String> parts = Splitter.on('$').split(klass.getSimpleName());
-    if (Iterables.size(parts) != 3) {
-      return null;
-    }
-    Iterator<String> partsIter = parts.iterator();
-    partsIter.next();
-    return partsIter.next();
-  }
-
-  /**
-   * Check if passed in class is a Jython class
-   *
-   * @param klass to check
-   * @return true if Jython class, false otherwise
-   */
-  private static boolean isJythonClass(Class klass) {
-    return klass.getCanonicalName().startsWith("org.python.proxies");
-  }
-
-  /**
-   * Initialize edge input
-   *
-   * @param conf Configuration
-   * @param jythonJob data to initialize
-   */
-  private static void initEdgeInputs(Configuration conf, JythonJob jythonJob) {
-    List<JythonJob.EdgeInput> edgeInputs = jythonJob.getEdge_inputs();
-    if (!edgeInputs.isEmpty()) {
-      if (edgeInputs.size() == 1) {
-        EDGE_INPUT_FORMAT_CLASS.set(conf, HiveEdgeInputFormat.class);
-        JythonJob.EdgeInput edgeInput = edgeInputs.get(0);
-        checkEdgeInput(edgeInput);
-        LOG.info("Setting edge input using: " + edgeInput);
-
-        HIVE_EDGE_INPUT.getDatabaseOpt().set(conf,
-            jythonJob.getHive_database());
-        HIVE_EDGE_INPUT.getTableOpt().set(conf, edgeInput.getTable());
-        if (edgeInput.getPartition_filter() != null) {
-          HIVE_EDGE_INPUT.getPartitionOpt().set(conf,
-              edgeInput.getPartition_filter());
-        }
-        HIVE_EDGE_INPUT.getClassOpt().set(conf, JythonHiveToEdge.class);
-
-        EDGE_SOURCE_ID_COLUMN.set(conf, edgeInput.getSource_id_column());
-        EDGE_TARGET_ID_COLUMN.set(conf, edgeInput.getTarget_id_column());
-        if (edgeInput.getValue_column() != null) {
-          EDGE_VALUE_COLUMN.set(conf, edgeInput.getValue_column());
-        }
-      } else {
-        EDGE_INPUT_FORMAT_CLASS.set(conf, MultiEdgeInputFormat.class);
-        throw new IllegalArgumentException(
-            "Multiple edge inputs not supported yet: " + edgeInputs);
-      }
-    }
-  }
-
-  /**
-   * Check that the edge input is valid
-   *
-   * @param edgeInput data to check
-   */
-  private static void checkEdgeInput(JythonJob.EdgeInput edgeInput) {
-    checkNotNull(edgeInput.getTable(), "EdgeInput table name needs to be set");
-    checkNotNull(edgeInput.getSource_id_column(),
-        "EdgeInput source ID column needs to be set");
-    checkNotNull(edgeInput.getTarget_id_column(),
-        "EdgeInput target ID column needs to be set");
-  }
-
-  /**
-   * Initialize vertex output info
-   *
-   * @param conf Configuration
-   * @param jythonJob the job info
-   */
-  private static void initVertexInputs(Configuration conf,
-      JythonJob jythonJob) {
-    List<JythonJob.VertexInput> vertexInputs = jythonJob.getVertex_inputs();
-    if (!vertexInputs.isEmpty()) {
-      if (vertexInputs.size() == 1) {
-        VERTEX_INPUT_FORMAT_CLASS.set(conf, HiveVertexInputFormat.class);
-        JythonJob.VertexInput vertexInput = vertexInputs.get(0);
-        checkVertexInput(vertexInput);
-        LOG.info("Setting vertex input using: " + vertexInput);
-
-        HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf,
-            jythonJob.getHive_database());
-        HIVE_VERTEX_INPUT.getTableOpt().set(conf, vertexInput.getTable());
-        if (vertexInput.getPartition_filter() != null) {
-          HIVE_VERTEX_INPUT.getPartitionOpt().set(conf,
-              vertexInput.getPartition_filter());
-        }
-        HIVE_VERTEX_INPUT.getClassOpt().set(conf, JythonHiveToVertex.class);
-
-        JythonHiveToVertex.VERTEX_ID_COLUMN.set(conf,
-            vertexInput.getId_column());
-        if (vertexInput.getValue_column() != null) {
-          JythonHiveToVertex.VERTEX_VALUE_COLUMN.set(conf,
-              vertexInput.getValue_column());
-        }
-      } else {
-        VERTEX_INPUT_FORMAT_CLASS.set(conf, MultiVertexInputFormat.class);
-        throw new IllegalArgumentException(
-            "Multiple vertex inputs not supported yet: " + vertexInputs);
-      }
-    }
-  }
-
-  /**
-   * Check that the vertex input info is valid
-   *
-   * @param vertexInput data to check
-   */
-  private static void checkVertexInput(JythonJob.VertexInput vertexInput) {
-    checkNotNull(vertexInput.getTable(),
-        "VertexInput table name needs to be set");
-    checkNotNull(vertexInput.getId_column(),
-        "VertexInput ID column needs to be set");
-  }
-
-  /**
-   * Check if the writable is holding a primitive type
-   *
-   * @param klass Writable class
-   * @return true if writable is holding primitive
-   */
-  public static boolean isPrimitiveWritable(Class klass) {
-    return NullWritable.class.equals(klass) ||
-        BooleanWritable.class.equals(klass) ||
-        ByteWritable.class.equals(klass) ||
-        IntWritable.class.equals(klass) ||
-        LongWritable.class.equals(klass) ||
-        FloatWritable.class.equals(klass) ||
-        DoubleWritable.class.equals(klass);
-  }
-
-  /**
-   * Tell whether the user type given is a primitive writable
-   *
-   * @param typeHolder TypeHolder
-   * @return true if type is a Java primitive writable
-   */
-  public static boolean userTypeIsJavaPrimitiveWritable(
-      JythonJob.TypeHolder typeHolder) {
-    Object type = typeHolder.getType();
-    if (type instanceof Class) {
-      return isPrimitiveWritable((Class) type);
-    } else if (type instanceof String) {
-      try {
-        Class klass = Class.forName((String) type);
-        return isPrimitiveWritable(klass);
-      } catch (ClassNotFoundException e) {
-        return false;
-      }
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Initialize output info
-   *
-   * @param conf Configuration
-   * @param jythonJob the job info
-   */
-  private static void initOutput(Configuration conf, JythonJob jythonJob) {
-    JythonJob.VertexOutput vertexOutput = jythonJob.getVertex_output();
-    if (vertexOutput.getTable() != null) {
-      LOG.info("Setting vertex output using: " + vertexOutput);
-      VERTEX_OUTPUT_FORMAT_CLASS.set(conf, HiveVertexOutputFormat.class);
-      VERTEX_TO_HIVE_CLASS.set(conf, JythonVertexToHive.class);
-      JythonVertexToHive.VERTEX_ID_COLUMN.set(conf,
-          vertexOutput.getId_column());
-      VERTEX_VALUE_COLUMN.set(conf, vertexOutput.getValue_column());
-      HIVE_VERTEX_OUTPUT_DATABASE.set(conf, jythonJob.getHive_database());
-      HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
-      HIVE_VERTEX_OUTPUT_TABLE.set(conf, vertexOutput.getTable());
-      if (vertexOutput.getPartition() != null) {
-        HIVE_VERTEX_OUTPUT_PARTITION.set(conf,
-            makePartitionString(vertexOutput.getPartition()));
-      }
-    }
-  }
-
-  /**
-   * Create partition string
-   *
-   * @param parts partition pieces
-   * @return partition string
-   */
-  private static String makePartitionString(Map<String, String> parts) {
-    return Joiner.on(",").withKeyValueSeparator("=").join(parts);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java
deleted file mode 100644
index 18661dc..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.hive.common.HiveUtils;
-import org.apache.giraph.hive.values.HiveValueReader;
-import org.apache.giraph.jython.JythonUtils;
-import org.apache.hadoop.io.Writable;
-import org.python.core.PyObject;
-
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-/**
- * A Vertex ID reader from a single Hive column.
- *
- * @param <T> Vertex ID
- */
-public class JythonColumnReader<T extends Writable>
-    implements HiveValueReader<T> {
-  /** The column to read from Hive */
-  private JythonReadableColumn column = new JythonReadableColumn();
-  /** User's jython column reader */
-  private final JythonHiveReader jythonHiveReader;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex index for column
-   * @param jythonHiveReader jython column reader
-   */
-  public JythonColumnReader(int columnIndex,
-      JythonHiveReader jythonHiveReader) {
-    column.setIndex(columnIndex);
-    this.jythonHiveReader = jythonHiveReader;
-  }
-
-  /**
-   * Create a new value reader
-   *
-   * @param <T> value type
-   * @param conf {@link ImmutableClassesGiraphConfiguration}
-   * @param jythonClassOption option for jython class name
-   * @param columnOption {@link StrConfOption}
-   * @param schema {@link HiveTableSchema}
-   * @return new {@link JythonColumnReader}
-   */
-  public static <T extends Writable> JythonColumnReader<T>
-  create(ImmutableClassesGiraphConfiguration conf,
-      StrConfOption jythonClassOption, StrConfOption columnOption,
-      HiveTableSchema schema) {
-    PyObject pyClass =
-        JythonUtils.getInterpreter().get(jythonClassOption.get(conf));
-    JythonHiveReader jythonHiveReader = (JythonHiveReader)
-        pyClass.__call__().__tojava__(JythonHiveReader.class);
-    int columnIndex = HiveUtils.columnIndexOrThrow(schema, conf, columnOption);
-    return new JythonColumnReader<T>(columnIndex, jythonHiveReader);
-  }
-
-  @Override
-  public void readFields(T value, HiveReadableRecord record) {
-    column.setRecord(record);
-    jythonHiveReader.readFromHive(value, column);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java
deleted file mode 100644
index c4da691..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonColumnWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.hive.column.HiveWritableColumn;
-import org.apache.giraph.hive.common.HiveUtils;
-import org.apache.giraph.hive.values.HiveValueWriter;
-import org.apache.giraph.jython.JythonUtils;
-import org.apache.hadoop.io.Writable;
-import org.python.core.PyObject;
-
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-/**
- * A Hive type writer that uses {@link JythonHiveWriter}
- *
- * @param <W> graph value type
- */
-public class JythonColumnWriter<W extends Writable>
-    implements HiveValueWriter<W> {
-  /** The column to read from Hive */
-  private HiveWritableColumn column = new HiveWritableColumn();
-  /** The user's Jython column writer */
-  private final JythonHiveWriter jythonHiveWriter;
-
-  /**
-   * Constructor
-   *
-   * @param columnIndex column index
-   * @param jythonHiveWriter user's Jython column writer
-   */
-  public JythonColumnWriter(int columnIndex,
-      JythonHiveWriter jythonHiveWriter) {
-    column.setIndex(columnIndex);
-    this.jythonHiveWriter = jythonHiveWriter;
-  }
-
-  /**
-   * Create a new vertex ID reader
-   *
-   * @param conf {@link ImmutableClassesGiraphConfiguration}
-   * @param jythonClassOption Option for name of Jython class
-   * @param columnOption {@link StrConfOption}
-   * @param schema {@link HiveTableSchema}
-   * @param <W> Graph value type
-   * @return new {@link JythonColumnWriter}
-   */
-  public static <W extends Writable> JythonColumnWriter<W>
-  create(ImmutableClassesGiraphConfiguration conf,
-      StrConfOption jythonClassOption, StrConfOption columnOption,
-      HiveTableSchema schema) {
-    String className = jythonClassOption.get(conf);
-    PyObject pyClass = JythonUtils.getInterpreter().get(className);
-    JythonHiveWriter jythonColumnWritable = (JythonHiveWriter)
-        pyClass.__call__().__tojava__(JythonHiveWriter.class);
-    int columnIndex = HiveUtils.columnIndexOrThrow(schema, conf, columnOption);
-    return new JythonColumnWriter<W>(columnIndex, jythonColumnWritable);
-  }
-
-  @Override
-  public void write(W value, HiveWritableRecord record) {
-    column.setRecord(record);
-    jythonHiveWriter.writeToHive(value, column);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java
deleted file mode 100644
index c260fa0..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveIO.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-/**
- * A type that can read and write itself using a Hive column.
- */
-public interface JythonHiveIO extends JythonHiveReader, JythonHiveWriter {
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java
deleted file mode 100644
index 79416da..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Interface for reading user graph type (IVEMM) from Hive columns.
- *
- * @param <W> user graph type
- */
-public interface JythonHiveReader<W extends Writable> {
-  /**
-   * Read data from Hive column
-   *
-   * @param value user value to read
-   * @param column Hive column to read from
-   */
-  void readFromHive(W value, JythonReadableColumn column);
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java
deleted file mode 100644
index 1fd5fd0..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToEdge.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.graph.GraphType;
-import org.apache.giraph.hive.common.GiraphHiveConstants;
-import org.apache.giraph.hive.input.edge.SimpleHiveToEdge;
-import org.apache.giraph.hive.values.HiveValueReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-import java.util.Iterator;
-
-/**
- * A {@link org.apache.giraph.hive.input.edge.HiveToEdge} that reads each part
- * (vertex ID, edge value) using separate readers.
- *
- * @param <I> Vertex ID
- * @param <E> Edge Value
- */
-public class JythonHiveToEdge<I extends WritableComparable, E extends Writable>
-    extends SimpleHiveToEdge<I, E> {
-  /** Source ID column name in Hive */
-  public static final StrConfOption EDGE_SOURCE_ID_COLUMN =
-      new StrConfOption("hive.input.edge.source.id.column", null,
-          "Source Vertex ID column");
-  /** Target ID column name in Hive */
-  public static final StrConfOption EDGE_TARGET_ID_COLUMN =
-      new StrConfOption("hive.input.edge.target.id.column", null,
-          "Target Vertex ID column");
-  /** Edge Value column name in Hive */
-  public static final StrConfOption EDGE_VALUE_COLUMN =
-      new StrConfOption("hive.input.edge.value.column", null,
-          "Edge Value column");
-
-  /** Source ID reader */
-  private HiveValueReader<I> sourceIdReader;
-  /** Target ID reader */
-  private HiveValueReader<I> targetIdReader;
-  /** Edge Value reader */
-  private HiveValueReader<E> edgeValueReader;
-
-  @Override
-  public void checkInput(HiveInputDescription inputDesc,
-      HiveTableSchema schema) { }
-
-  @Override
-  public void initializeRecords(Iterator<HiveReadableRecord> records) {
-    super.initializeRecords(records);
-
-    HiveTableSchema schema = getTableSchema();
-    ImmutableClassesGiraphConfiguration conf = getConf();
-
-    sourceIdReader = HiveJythonUtils.<I>newValueReader(schema,
-        EDGE_SOURCE_ID_COLUMN, conf, GraphType.VERTEX_ID,
-        GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME);
-    targetIdReader = HiveJythonUtils.<I>newValueReader(schema,
-        EDGE_TARGET_ID_COLUMN, conf, GraphType.VERTEX_ID,
-        GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME);
-    edgeValueReader = HiveJythonUtils.<E>newValueReader(schema,
-        EDGE_VALUE_COLUMN, conf, GraphType.EDGE_VALUE,
-        GiraphHiveConstants.EDGE_VALUE_READER_JYTHON_NAME);
-  }
-
-  @Override
-  public I getSourceVertexId(HiveReadableRecord record) {
-    I sourceId = getReusableSourceVertexId();
-    sourceIdReader.readFields(sourceId, record);
-    return sourceId;
-  }
-
-  @Override
-  public I getTargetVertexId(HiveReadableRecord record) {
-    I targetId = getReusableTargetVertexId();
-    targetIdReader.readFields(targetId, record);
-    return targetId;
-  }
-
-  @Override
-  public E getEdgeValue(HiveReadableRecord record) {
-    E edgeValue = getReusableEdgeValue();
-    edgeValueReader.readFields(edgeValue, record);
-    return edgeValue;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java
deleted file mode 100644
index b24283e..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveToVertex.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.GraphType;
-import org.apache.giraph.hive.common.GiraphHiveConstants;
-import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
-import org.apache.giraph.hive.values.HiveValueReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.record.HiveReadableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-import com.google.common.collect.ImmutableList;
-
-import java.util.Iterator;
-
-/**
- * A {@link org.apache.giraph.hive.input.vertex.HiveToVertex} that writes each
- * part (vertex ID, vertex value, edges) using separate writers.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- */
-public class JythonHiveToVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends SimpleHiveToVertex<I, V, E> {
-  /** Source ID column name in Hive */
-  public static final StrConfOption VERTEX_ID_COLUMN =
-      new StrConfOption("hive.input.vertex.id.column", null,
-          "Vertex ID column");
-  /** Target ID column name in Hive */
-  public static final StrConfOption VERTEX_VALUE_COLUMN =
-      new StrConfOption("hive.input.vertex.value.column", null,
-          "Vertex Value column");
-
-  /** Vertex ID reader */
-  private HiveValueReader<I> vertexIdReader;
-  /** Vertex Value reader */
-  private HiveValueReader<V> vertexValueReader;
-
-  @Override
-  public void checkInput(HiveInputDescription inputDesc,
-      HiveTableSchema schema) { }
-
-  @Override
-  public void initializeRecords(Iterator<HiveReadableRecord> records) {
-    super.initializeRecords(records);
-
-    HiveTableSchema schema = getTableSchema();
-    ImmutableClassesGiraphConfiguration<I, V, E> conf = getConf();
-
-    vertexIdReader = HiveJythonUtils.newValueReader(schema, VERTEX_ID_COLUMN,
-        conf, GraphType.VERTEX_ID,
-        GiraphHiveConstants.VERTEX_ID_READER_JYTHON_NAME);
-    vertexValueReader = HiveJythonUtils.newValueReader(schema,
-        VERTEX_VALUE_COLUMN, conf, GraphType.VERTEX_VALUE,
-        GiraphHiveConstants.VERTEX_VALUE_READER_JYTHON_NAME);
-  }
-
-  @Override
-  public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public I getVertexId(HiveReadableRecord record) {
-    I vertexId = getReusableVertexId();
-    vertexIdReader.readFields(vertexId, record);
-    return vertexId;
-  }
-
-  @Override
-  public V getVertexValue(HiveReadableRecord record) {
-    V vertexValue = getReusableVertexValue();
-    vertexValueReader.readFields(vertexValue, record);
-    return vertexValue;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java
deleted file mode 100644
index 4efe434..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonHiveWriter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.hive.column.HiveWritableColumn;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Interface for writing user graph types to Hive columns
- *
- * @param <W> writable value type
- */
-public interface JythonHiveWriter<W extends Writable> {
-  /**
-   * Write object's data to Hive
-   *
-   * @param value object to write
-   * @param column Hive column to write to
-   */
-  void writeToHive(W value, HiveWritableColumn column);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java
deleted file mode 100644
index 704a459..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonReadableColumn.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.hive.column.HiveReadableColumn;
-import org.python.core.Py;
-import org.python.core.PyBoolean;
-import org.python.core.PyDictionary;
-import org.python.core.PyFloat;
-import org.python.core.PyInteger;
-import org.python.core.PyList;
-import org.python.core.PyLong;
-import org.python.core.PyObject;
-import org.python.core.PyString;
-
-import com.facebook.hiveio.common.HiveType;
-import com.facebook.hiveio.record.HiveReadableRecord;
-
-/**
- * A single column from a Hive record which reads Jython types
- */
-public class JythonReadableColumn {
-  /** Hive column */
-  private final HiveReadableColumn column = new HiveReadableColumn();
-
-  /**
-   * Set Hive record
-   *
-   * @param record Hive record
-   */
-  public void setRecord(HiveReadableRecord record) {
-    column.setRecord(record);
-  }
-
-  /**
-   * Set column index
-   *
-   * @param index column index
-   */
-  public void setIndex(int index) {
-    column.setIndex(index);
-  }
-
-  /**
-   * Get PyBoolean from a boolean column
-   *
-   * @return PyBoolean
-   */
-  public PyBoolean getBoolean() {
-    return new PyBoolean(column.getBoolean());
-  }
-
-  /**
-   * Get PyInteger from a byte, short, or integer column
-   *
-   * @return PyInteger
-   */
-  public PyInteger getByte() {
-    return getInt();
-  }
-
-  /**
-   * Get PyInteger from a byte, short, or integer column
-   *
-   * @return PyInteger
-   */
-  public PyInteger getShort() {
-    return getInt();
-  }
-
-  /**
-   * Get PyInteger from a byte, short, or integer column
-   *
-   * @return PyInteger
-   */
-  public PyInteger getInt() {
-    int value;
-    if (column.hiveType() == HiveType.BYTE) {
-      value = column.getByte();
-    } else if (column.hiveType() == HiveType.SHORT) {
-      value = column.getShort();
-    } else if (column.hiveType() == HiveType.LONG) {
-      value = column.getInt();
-    } else {
-      throw new IllegalArgumentException(
-          "Column is not a byte/short/int, is " + column.hiveType());
-    }
-    return new PyInteger(value);
-  }
-
-  /**
-   * Get PyLong as long
-   *
-   * @return PyLong
-   */
-  public PyLong getLong() {
-    return new PyLong(column.getLong());
-  }
-
-  /**
-   * Get PyFloat from a double or float column
-   *
-   * @return PyFloat
-   */
-  public PyFloat getDouble() {
-    return getFloat();
-  }
-
-  /**
-   * Get double or float column as PyFloat
-   *
-   * @return PyFloat
-   */
-  public PyFloat getFloat() {
-    double value;
-    if (column.hiveType() == HiveType.FLOAT) {
-      value = column.getFloat();
-    } else if (column.hiveType() == HiveType.DOUBLE) {
-      value = column.getDouble();
-    } else {
-      throw new IllegalArgumentException("Column is not a float/double, is " +
-          column.hiveType());
-    }
-    return new PyFloat(value);
-  }
-
-  /**
-   * Get PyString from a string column
-   *
-   * @return PyString
-   */
-  public PyString getString() {
-    return new PyString(column.getString());
-  }
-
-  /**
-   * Get PyList from a list column
-   *
-   * @return PyList
-   */
-  public PyList getList() {
-    return new PyList(column.getList());
-  }
-
-  /**
-   * Get PyMap from a map column
-   *
-   * @return PyMap
-   */
-  public PyDictionary getMap() {
-    PyDictionary dict = new PyDictionary();
-    dict.putAll(column.getMap());
-    return dict;
-  }
-
-  /**
-   * Get arbitrary PyObject
-   *
-   * @return PyObject
-   */
-  public PyObject get() {
-    return Py.java2py(column.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java
deleted file mode 100644
index 6d9d030..0000000
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/JythonVertexToHive.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.hive.jython;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.graph.GraphType;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.hive.common.GiraphHiveConstants;
-import org.apache.giraph.hive.output.SimpleVertexToHive;
-import org.apache.giraph.hive.values.HiveValueWriter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.facebook.hiveio.output.HiveOutputDescription;
-import com.facebook.hiveio.record.HiveWritableRecord;
-import com.facebook.hiveio.schema.HiveTableSchema;
-
-/**
- * A {@link org.apache.giraph.hive.output.VertexToHive} that writes each part
- * (vertex ID, vertex value, edge value) using separate writers.
- *
- * @param <I> Vertex ID
- * @param <V> Vertex Value
- * @param <E> Edge Value
- */
-public class JythonVertexToHive<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    extends SimpleVertexToHive<I, V, E> {
-  /** Source ID column name in Hive */
-  public static final StrConfOption VERTEX_ID_COLUMN =
-      new StrConfOption("hive.output.vertex.id.column", null,
-          "Source Vertex ID column");
-  /** Target ID column name in Hive */
-  public static final StrConfOption VERTEX_VALUE_COLUMN =
-      new StrConfOption("hive.output.vertex.value.column", null,
-          "Target Vertex ID column");
-
-  /** Vertex ID writer */
-  private HiveValueWriter<I> vertexIdWriter;
-  /** Vertex Value writer */
-  private HiveValueWriter<V> vertexValueWriter;
-
-  @Override
-  public void checkOutput(HiveOutputDescription outputDesc,
-      HiveTableSchema schema, HiveWritableRecord emptyRecord) { }
-
-  @Override
-  public void initialize() {
-    HiveTableSchema schema = getTableSchema();
-    ImmutableClassesGiraphConfiguration conf = getConf();
-
-    vertexIdWriter = HiveJythonUtils.newValueWriter(schema,
-        VERTEX_ID_COLUMN, conf, GraphType.VERTEX_ID,
-        GiraphHiveConstants.VERTEX_ID_WRITER_JYTHON_NAME);
-    vertexValueWriter = HiveJythonUtils.newValueWriter(schema,
-        VERTEX_VALUE_COLUMN, conf, GraphType.VERTEX_VALUE,
-        GiraphHiveConstants.VERTEX_VALUE_WRITER_JYTHON_NAME);
-  }
-
-  @Override
-  public void fillRecord(Vertex<I, V, E> vertex, HiveWritableRecord record) {
-    vertexIdWriter.write(vertex.getId(), record);
-    vertexValueWriter.write(vertex.getValue(), record);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java
deleted file mode 100644
index 558d828..0000000
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Hive Jython things.
- */
-package org.apache.giraph.hive.jython;

Reply via email to