Author: aching
Date: Thu Sep 27 08:03:01 2012
New Revision: 1390878

URL: http://svn.apache.org/viewvc?rev=1390878&view=rev
Log:
GIRAPH-343: Use published hcatalog jars. (nitayj via aching)


Added:
    
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
    
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
    
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
    
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/pom.xml

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1390878&r1=1390877&r2=1390878&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Sep 27 08:03:01 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  
+  GIRAPH-343: Use published hcatalog jars. (nitayj via aching)
+
   GIRAPH-338: More Rat Ignores (Nitay Joffe via ereisman)
 
   GIRAPH-347: GiraphConfiguration broke hcatalog build

Modified: giraph/trunk/giraph-formats-contrib/pom.xml
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/pom.xml?rev=1390878&r1=1390877&r2=1390878&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/pom.xml (original)
+++ giraph/trunk/giraph-formats-contrib/pom.xml Thu Sep 27 08:03:01 2012
@@ -28,7 +28,7 @@ under the License.
         <compileSource>1.6</compileSource>
         <hadoop.version>0.20.203.0</hadoop.version>
         <hbase.version>0.90.5</hbase.version>
-        <hcatalog.version>0.4.0-dev</hcatalog.version>
+        <hcatalog.version>0.5.0-SNAPSHOT</hcatalog.version>
         <hive.version>0.9.0</hive.version>
         <accumulo.version>1.4.0</accumulo.version>
         <maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
@@ -74,43 +74,48 @@ under the License.
             </plugin>
         </plugins>
     </build>
-    <profiles>
-        <profile>
-            <id>hcatalog</id>
-            <dependencies>
-              <dependency>
-                <groupId>org.apache.hcatalog</groupId>
-                <artifactId>hcatalog</artifactId>
-                <version>${hcatalog.version}</version>
-              </dependency>
-            </dependencies>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>build-helper-maven-plugin</artifactId>
-                        <version>1.7</version>
-                        <executions>
-                            <execution>
-                                <id>add-sources</id>
-                                <phase>generate-sources</phase>
-                                <goals>
-                                    <goal>add-source</goal>
-                                </goals>
-                                <configuration>
-                                    <sources>
-                                        
<source>${basedir}/src/hcatalog/java/</source>
-                                    </sources>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
+    <repositories>
+      <!-- This is the main maven repository. Normally we wouldn't need to put
+           it here when it's the only one being used, but since we need to add
+           special repositories to get hcatalog we need to mention this one
+           specifically otherwise it won't be included. -->
+      <repository>
+        <id>central</id>
+        <name>Maven Repository</name>
+        <url>http://repo1.maven.org/maven2</url>
+        <releases>
+          <enabled>true</enabled>
+        </releases>
+      </repository>
+      <!-- This is necessary for hcatalog. -->
+      <repository>
+        <id>apache</id>
+        <name>Apache Repository</name>
+        <url>https://repository.apache.org/content/repositories/snapshots</url>
+        <snapshots>
+          <enabled>true</enabled>
+        </snapshots>
+      </repository>
+      <!-- This is necessary for hive-metastore dependencies for hcatalog. -->
+      <repository>
+        <id>datanucleus</id>
+        <name>datanucleus maven repository</name>
+        <url>http://www.datanucleus.org/downloads/maven2</url>
+        <layout>default</layout>
+        <releases>
+          <enabled>true</enabled>
+          <checksumPolicy>warn</checksumPolicy>
+        </releases>
+      </repository>
+    </repositories>
     <dependencies>
         <dependency>
+          <groupId>org.apache.hcatalog</groupId>
+          <artifactId>hcatalog</artifactId>
+          <!-- TODO: Use 0.5.0 stable release -->
+          <version>${hcatalog.version}</version>
+        </dependency>
+        <dependency>
             <artifactId>giraph</artifactId>
             <groupId>org.apache.giraph</groupId>
             <version>0.2-SNAPSHOT</version>

Added: 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java?rev=1390878&view=auto
==============================================================================
--- 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
 (added)
+++ 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexInputFormat.java
 Thu Sep 27 08:03:01 2012
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Abstract class that users should subclass to load data from a Hive or Pig
+ * table. You can easily implement a {@link HCatalogVertexReader} by extending
+ * either {@link SingleRowHCatalogVertexReader} or
+ * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex 
is
+ * stored in the input table.
+ * <p>
+ * The desired database and table name to load from can be specified via
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, 
org.apache.hcatalog.mapreduce.InputJobInfo)}
+ * as you setup your vertex input format with
+ * {@link GiraphJob#setVertexInputFormatClass(Class)}.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ * @param <M>
+ *            Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexInputFormat<I extends WritableComparable, 
V extends Writable, E extends Writable, M extends Writable>
+               extends VertexInputFormat<I, V, E, M> {
+
+       protected HCatInputFormat hCatInputFormat = new HCatInputFormat();
+
+       @Override
+       public final List<InputSplit> getSplits(JobContext context, int 
numWorkers)
+                       throws IOException, InterruptedException {
+               return hCatInputFormat.getSplits(context);
+       }
+
+       /**
+        * Abstract class that users should subclass based on their specific 
vertex
+        * input. HCatRecord can be parsed to get the required data for 
implementing
+        * getCurrentVertex(). If the vertex spans more than one HCatRecord,
+        * nextVertex() should be overwritten to handle that logic as well.
+        * 
+        * @param <I>
+        *            Vertex index value
+        * @param <V>
+        *            Vertex value
+        * @param <E>
+        *            Edge value
+        * @param <M>
+        *            Message value
+        */
+       protected abstract class HCatalogVertexReader implements
+                       VertexReader<I, V, E, M> {
+
+               /** Internal HCatRecordReader */
+               private RecordReader<WritableComparable, HCatRecord> 
hCatRecordReader;
+
+               /** Context passed to initialize */
+               private TaskAttemptContext context;
+
+               /**
+                * Initialize with the HCatRecordReader.
+                * 
+                * @param hCatRecordReader
+                *            Internal reader
+                */
+               private void initialize(
+                               RecordReader<WritableComparable, HCatRecord> 
hCatRecordReader) {
+                       this.hCatRecordReader = hCatRecordReader;
+               }
+
+               @Override
+               public void initialize(InputSplit inputSplit, 
TaskAttemptContext context)
+                               throws IOException, InterruptedException {
+                       hCatRecordReader.initialize(inputSplit, context);
+                       this.context = context;
+               }
+
+               @Override
+               public boolean nextVertex() throws IOException, 
InterruptedException {
+                       // Users can override this if desired, and a vertex is 
bigger than
+                       // a single row.
+                       return hCatRecordReader.nextKeyValue();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       hCatRecordReader.close();
+               }
+
+               @Override
+               public float getProgress() throws IOException, 
InterruptedException {
+                       return hCatRecordReader.getProgress();
+               }
+
+               /**
+                * Get the record reader.
+                * 
+                * @return Record reader to be used for reading.
+                */
+               protected RecordReader<WritableComparable, HCatRecord> 
getRecordReader() {
+                       return hCatRecordReader;
+               }
+
+               /**
+                * Get the context.
+                * 
+                * @return Context passed to initialize.
+                */
+               protected TaskAttemptContext getContext() {
+                       return context;
+               }
+       }
+
+       protected abstract HCatalogVertexReader createVertexReader();
+
+       @Override
+       public final VertexReader<I, V, E, M> createVertexReader(InputSplit 
split,
+                       TaskAttemptContext context) throws IOException {
+               try {
+                       HCatalogVertexReader reader = createVertexReader();
+                       reader.initialize(hCatInputFormat
+                                       .createRecordReader(split, context));
+                       return reader;
+               } catch (InterruptedException e) {
+                       throw new IllegalStateException(
+                                       "createVertexReader: Interrupted 
creating reader.", e);
+               }
+       }
+
+       /**
+        * HCatalogVertexReader for tables holding complete vertex info within 
each
+        * row.
+        */
+       protected abstract class SingleRowHCatalogVertexReader extends
+                       HCatalogVertexReader {
+
+               protected abstract I getVertexId(HCatRecord record);
+
+               protected abstract V getVertexValue(HCatRecord record);
+
+               protected abstract Map<I, E> getEdges(HCatRecord record);
+
+               private int recordCount = 0;
+
+               @Override
+               public final Vertex<I, V, E, M> getCurrentVertex()
+                               throws IOException, InterruptedException {
+                       HCatRecord record = getRecordReader().getCurrentValue();
+                       Vertex<I, V, E, M> vertex = 
BspUtils.createVertex(getContext()
+                                       .getConfiguration());
+                       vertex.initialize(getVertexId(record), 
getVertexValue(record),
+                                       getEdges(record), null);
+                       ++recordCount;
+                       if ((recordCount % 1000) == 0) {
+                               System.out.println("read " + recordCount + " 
records");
+                               // memory usage
+                               Runtime runtime = Runtime.getRuntime();
+                               double gb = 1024 * 1024 * 1024;
+                               System.out.println("Memory: " + 
(runtime.totalMemory() / gb)
+                                               + "GB total = "
+                                               + ((runtime.totalMemory() - 
runtime.freeMemory()) / gb)
+                                               + "GB used + " + 
(runtime.freeMemory() / gb)
+                                               + "GB free, " + 
(runtime.maxMemory() / gb) + "GB max");
+                       }
+                       return vertex;
+               }
+
+       }
+
+       /**
+        * HCatalogVertexReader for tables holding vertex info across multiple 
rows
+        * sorted by vertex id column, so that they appear consecutively to the
+        * RecordReader.
+        */
+       protected abstract class MultiRowHCatalogVertexReader extends
+                       HCatalogVertexReader {
+
+               protected abstract I getVertexId(HCatRecord record);
+
+               protected abstract V getVertexValue(Iterable<HCatRecord> 
records);
+
+               protected abstract I getTargetVertexId(HCatRecord record);
+
+               protected abstract E getEdgeValue(HCatRecord record);
+
+               private Vertex<I, V, E, M> vertex = null;
+
+               @Override
+               public Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+                               InterruptedException {
+                       return vertex;
+               }
+
+               private I currentVertexId = null;
+               private Map<I, E> destEdgeMap = Maps.newHashMap();
+               private List<HCatRecord> recordsForVertex = 
Lists.newArrayList();
+               private int recordCount = 0;
+
+               @Override
+               public final boolean nextVertex() throws IOException,
+                               InterruptedException {
+                       while (getRecordReader().nextKeyValue()) {
+                               HCatRecord record = 
getRecordReader().getCurrentValue();
+                               if (currentVertexId == null) {
+                                       currentVertexId = getVertexId(record);
+                               }
+                               if 
(currentVertexId.equals(getVertexId(record))) {
+                                       
destEdgeMap.put(getTargetVertexId(record),
+                                                       getEdgeValue(record));
+                                       recordsForVertex.add(record);
+                               } else {
+                                       createCurrentVertex();
+                                       if ((recordCount % 1000) == 0) {
+                                               System.out.println("read " + 
recordCount);
+                                       }
+                                       currentVertexId = getVertexId(record);
+                                       recordsForVertex.add(record);
+                                       return true;
+                               }
+                       }
+
+                       if (destEdgeMap.isEmpty()) {
+                               return false;
+                       } else {
+                               createCurrentVertex();
+                               return true;
+                       }
+
+               }
+
+               private void createCurrentVertex() {
+                       vertex = 
BspUtils.createVertex(getContext().getConfiguration());
+                       vertex.initialize(currentVertexId,
+                                       getVertexValue(recordsForVertex), 
destEdgeMap, null);
+                       destEdgeMap.clear();
+                       recordsForVertex.clear();
+                       ++recordCount;
+               }
+
+       }
+
+}

Added: 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java?rev=1390878&view=auto
==============================================================================
--- 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
 (added)
+++ 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HCatalogVertexOutputFormat.java
 Thu Sep 27 08:03:01 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+/**
+ * Abstract class that users should subclass to store data to Hive or Pig 
table.
+ * You can easily implement a {@link HCatalogVertexWriter} by extending
+ * {@link SingleRowHCatalogVertexWriter} or {@link 
MultiRowHCatalogVertexWriter}
+ * depending on how you want to fit your vertices into the output table.
+ * <p>
+ * The desired database and table name to store to can be specified via
+ * {@link HCatOutputFormat#setOutput(org.apache.hadoop.mapreduce.Job, 
org.apache.hcatalog.mapreduce.OutputJobInfo)}
+ * as you setup your vertex output format with
+ * {@link GiraphJob#setVertexOutputFormatClass(Class)}. You must create the
+ * output table.
+ * 
+ * @param <I>
+ *            Vertex index value
+ * @param <V>
+ *            Vertex value
+ * @param <E>
+ *            Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HCatalogVertexOutputFormat<I extends WritableComparable, 
V extends Writable, E extends Writable>
+               extends VertexOutputFormat<I, V, E> {
+
+       protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
+
+       @Override
+       public final void checkOutputSpecs(JobContext context) throws 
IOException,
+                       InterruptedException {
+               hCatOutputFormat.checkOutputSpecs(context);
+       }
+
+       @Override
+       public final OutputCommitter getOutputCommitter(TaskAttemptContext 
context)
+                       throws IOException, InterruptedException {
+               return hCatOutputFormat.getOutputCommitter(context);
+       }
+
+       /**
+        * Abstract class that users should subclass based on their specific 
vertex
+        * output. Users should implement writeVertex to create a HCatRecord 
that is
+        * valid to for writing by HCatalogRecordWriter.
+        * 
+        * @param <I>
+        *            Vertex index value
+        * @param <V>
+        *            Vertex value
+        * @param <E>
+        *            Edge value
+        */
+       protected abstract class HCatalogVertexWriter implements
+                       VertexWriter<I, V, E> {
+
+               /** Internal HCatRecordWriter */
+               private RecordWriter<WritableComparable<?>, HCatRecord> 
hCatRecordWriter;
+               /** Context passed to initialize */
+               private TaskAttemptContext context;
+
+               /**
+                * Initialize with the HCatRecordWriter
+                * 
+                * @param hCatRecordWriter
+                *            Internal writer
+                */
+               private void initialize(
+                               RecordWriter<WritableComparable<?>, HCatRecord> 
hCatRecordWriter) {
+                       this.hCatRecordWriter = hCatRecordWriter;
+               }
+
+               /**
+                * Get the record reader.
+                * 
+                * @return Record reader to be used for reading.
+                */
+               protected RecordWriter<WritableComparable<?>, HCatRecord> 
getRecordWriter() {
+                       return hCatRecordWriter;
+               }
+
+               /**
+                * Get the context.
+                * 
+                * @return Context passed to initialize.
+                */
+               protected TaskAttemptContext getContext() {
+                       return context;
+               }
+
+               @Override
+               public void initialize(TaskAttemptContext context) throws 
IOException {
+                       this.context = context;
+               }
+
+               @Override
+               public void close(TaskAttemptContext context) throws 
IOException,
+                               InterruptedException {
+                       hCatRecordWriter.close(context);
+               }
+
+       }
+
+       protected abstract HCatalogVertexWriter createVertexWriter();
+
+       @Override
+       public final VertexWriter<I, V, E> createVertexWriter(
+                       TaskAttemptContext context) throws IOException,
+                       InterruptedException {
+               HCatalogVertexWriter writer = createVertexWriter();
+               writer.initialize(hCatOutputFormat.getRecordWriter(context));
+               return writer;
+       }
+
+       /**
+        * HCatalogVertexWriter to write each vertex in each row.
+        */
+       protected abstract class SingleRowHCatalogVertexWriter extends
+                       HCatalogVertexWriter {
+
+               protected abstract int getNumColumns();
+
+               protected abstract void fillRecord(HCatRecord record,
+                               Vertex<I, V, E, ?> vertex);
+
+               protected HCatRecord createRecord(Vertex<I, V, E, ?> vertex) {
+                       HCatRecord record = new 
DefaultHCatRecord(getNumColumns());
+                       fillRecord(record, vertex);
+                       return record;
+               }
+
+               @Override
+               // XXX It is important not to put generic type signature 
<I,V,E,?> after
+               // Vertex. Otherwise, any class that extends this will not 
compile
+               // because of not implementing the VertexWriter#writeVertex. 
Mystery of
+               // Java Generics :(
+               @SuppressWarnings("unchecked")
+               public final void writeVertex(Vertex vertex) throws IOException,
+                               InterruptedException {
+                       getRecordWriter().write(null, createRecord(vertex));
+               }
+
+       }
+
+       /**
+        * HCatalogVertexWriter to write each vertex in multiple rows.
+        */
+       public abstract class MultiRowHCatalogVertexWriter extends
+                       HCatalogVertexWriter {
+
+               protected abstract Iterable<HCatRecord> createRecords(
+                               Vertex<I, V, E, ?> vertex);
+
+               @Override
+               // XXX Same thing here. No Generics for Vertex here.
+               @SuppressWarnings("unchecked")
+               public final void writeVertex(Vertex vertex) throws IOException,
+                               InterruptedException {
+                       Iterable<HCatRecord> records = createRecords(vertex);
+                       for (HCatRecord record : records) {
+                               getRecordWriter().write(null, record);
+                       }
+               }
+
+       }
+
+}

Added: 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java?rev=1390878&view=auto
==============================================================================
--- 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
 (added)
+++ 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/HiveGiraphRunner.java
 Thu Sep 27 08:03:01 2012
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.format.hcatalog;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveGiraphRunner implements Tool {
+
+       @SuppressWarnings("rawtypes")
+       private Class<? extends Vertex> vertexClass;
+       @SuppressWarnings("rawtypes")
+       private Class<? extends HCatalogVertexInputFormat> 
vertexInputFormatClass;
+       @SuppressWarnings("rawtypes")
+       private Class<? extends HCatalogVertexOutputFormat> 
vertexOutputFormatClass;
+
+       protected HiveGiraphRunner(
+                       @SuppressWarnings("rawtypes") Class<? extends Vertex> 
vertexClass,
+                       @SuppressWarnings("rawtypes") Class<? extends 
HCatalogVertexInputFormat> vertexInputFormatClass,
+                       @SuppressWarnings("rawtypes") Class<? extends 
HCatalogVertexOutputFormat> vertexOutputFormatClass) {
+               this.vertexClass = vertexClass;
+               this.vertexInputFormatClass = vertexInputFormatClass;
+               this.vertexOutputFormatClass = vertexOutputFormatClass;
+               this.conf = new HiveConf(getClass());
+       }
+
+       protected String dbName;
+       protected String inputTableName;
+       protected String inputTableFilterExpr;
+       protected String outputTableName;
+       protected Map<String, String> outputTablePartitionValues;
+
+       protected int workers;
+       protected boolean isVerbose;
+
+       public static void main(String[] args) throws Exception {
+               System.exit(ToolRunner
+                               .run(new HiveGiraphRunner(null, null, null), 
args));
+       }
+
+       @Override
+       public final int run(String[] args) throws Exception {
+               // process args
+               try {
+                       processArguments(args);
+               } catch (InterruptedException e) {
+                       return 0;
+               } catch (IllegalArgumentException e) {
+                       System.err.println(e.getMessage());
+                       return -1;
+               }
+
+               // additional configuration for Hive
+               adjustConfigurationForHive(getConf());
+
+               // setup GiraphJob
+               GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+               GiraphConfiguration conf = job.getConfiguration();
+               conf.setVertexClass(vertexClass);
+
+               // setup input from Hive
+               InputJobInfo inputJobInfo = InputJobInfo.create(dbName, 
inputTableName,
+                               inputTableFilterExpr);
+               HCatInputFormat.setInput(job.getInternalJob(), inputJobInfo);
+               conf.setVertexInputFormatClass(vertexInputFormatClass);
+
+               // setup output to Hive
+               HCatOutputFormat.setOutput(job.getInternalJob(), 
OutputJobInfo.create(
+                               dbName, outputTableName, 
outputTablePartitionValues));
+               HCatOutputFormat.setSchema(job.getInternalJob(),
+                               
HCatOutputFormat.getTableSchema(job.getInternalJob()));
+               conf.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+               conf.setWorkerConfiguration(workers, workers, 100.0f);
+               initGiraphJob(job);
+
+               return job.run(isVerbose) ? 0 : -1;
+       }
+
+       private static void adjustConfigurationForHive(Configuration conf) {
+               // when output partitions are used, workers register them to the
+               // metastore at cleanup stage, and on HiveConf's 
initialization, it
+               // looks for hive-site.xml from.
+               addToStringCollection(conf, "tmpfiles", conf.getClassLoader()
+                               .getResource("hive-site.xml").toString());
+
+               // Also, you need hive.aux.jars as well
+               // addToStringCollection(conf, "tmpjars",
+               // conf.getStringCollection("hive.aux.jars.path"));
+
+               // Or, more effectively, we can provide all the jars client 
needed to
+               // the workers as well
+               String[] hadoopJars = System.getenv("HADOOP_CLASSPATH").split(
+                               File.pathSeparator);
+               List<String> hadoopJarURLs = Lists.newArrayList();
+               for (String jarPath : hadoopJars) {
+                       File file = new File(jarPath);
+                       if (file.exists() && file.isFile()) {
+                               String jarURL = file.toURI().toString();
+                               hadoopJarURLs.add(jarURL);
+                       }
+               }
+               addToStringCollection(conf, "tmpjars", hadoopJarURLs);
+       }
+
+       private CommandLine processArguments(String[] args) throws 
ParseException,
+                       InterruptedException {
+               Options options = new Options();
+               options.addOption("h", "help", false, "Help");
+               options.addOption("v", "verbose", false, "Verbose");
+               options.addOption("D", "hiveconf", true,
+                               "property=value for Hive/Hadoop configuration");
+               options.addOption("w", "workers", true, "Number of workers");
+               if (vertexClass == null)
+                       options.addOption(null, "vertexClass", true,
+                                       "Giraph Vertex class to use");
+               if (vertexInputFormatClass == null)
+                       options.addOption(null, "vertexInputFormatClass", true,
+                                       "Giraph HCatalogVertexInputFormat class 
to use");
+               if (vertexOutputFormatClass == null)
+                       options.addOption(null, "vertexOutputFormatClass", true,
+                                       "Giraph HCatalogVertexOutputFormat 
class to use");
+               options.addOption("db", "database", true, "Hive database name");
+               options.addOption("i", "inputTable", true, "Input table name");
+               options.addOption("I", "inputFilter", true,
+                               "Input table filter expression (e.g., \"a<2 AND 
b='two'\"");
+               options.addOption("o", "outputTable", true, "Output table 
name");
+               options.addOption("O", "outputPartition", true,
+                               "Output table partition values (e.g., 
\"a=1,b=two\")");
+               addMoreOptions(options);
+
+               CommandLineParser parser = new GnuParser();
+               final CommandLine cmdln = parser.parse(options, args);
+               // for (Option opt : cmd.getOptions()) {
+               // System.out.println(" opt -" + opt.getOpt() + " " + 
opt.getValue());
+               // }
+               if (args.length == 0 || cmdln.hasOption("help")) {
+                       new HelpFormatter().printHelp(getClass().getName(), 
options, true);
+                       throw new InterruptedException();
+               }
+
+               // Giraph classes
+               if (cmdln.hasOption("vertexClass"))
+                       vertexClass = 
findClass(cmdln.getOptionValue("vertexClass"),
+                                       Vertex.class);
+               if (cmdln.hasOption("vertexInputFormatClass"))
+                       vertexInputFormatClass = findClass(
+                                       
cmdln.getOptionValue("vertexInputFormatClass"),
+                                       HCatalogVertexInputFormat.class);
+               if (cmdln.hasOption("vertexOutputFormatClass"))
+                       vertexOutputFormatClass = findClass(
+                                       
cmdln.getOptionValue("vertexOutputFormatClass"),
+                                       HCatalogVertexOutputFormat.class);
+
+               if (vertexClass == null)
+                       throw new IllegalArgumentException(
+                                       "Need the Giraph Vertex class name 
(-vertexClass) to use");
+               if (vertexInputFormatClass == null)
+                       throw new IllegalArgumentException(
+                                       "Need the Giraph VertexInputFormat 
class name (-vertexInputFormatClass) to use");
+               if (vertexOutputFormatClass == null)
+                       throw new IllegalArgumentException(
+                                       "Need the Giraph VertexOutputFormat 
class name (-vertexOutputFormatClass) to use");
+
+               if (!cmdln.hasOption("workers"))
+                       throw new IllegalArgumentException(
+                                       "Need to choose the number of workers 
(-w)");
+               if (!cmdln.hasOption("inputTable"))
+                       throw new IllegalArgumentException(
+                                       "Need to set the input table name (-i). 
 One example is 'dim_friendlist'");
+               if (!cmdln.hasOption("outputTable"))
+                       throw new IllegalArgumentException(
+                                       "Need to set the output table name 
(-o).");
+
+               dbName = cmdln.getOptionValue("dbName", "default");
+               inputTableName = cmdln.getOptionValue("inputTable");
+               inputTableFilterExpr = cmdln.getOptionValue("inputFilter");
+               outputTableName = cmdln.getOptionValue("outputTable");
+               outputTablePartitionValues = parsePartitionValues(cmdln
+                               .getOptionValue("outputPartition"));
+               workers = Integer.parseInt(cmdln.getOptionValue("workers"));
+               isVerbose = cmdln.hasOption("verbose");
+
+               // pick up -hiveconf arguments
+               Configuration conf = getConf();
+               for (String hiveconf : cmdln.getOptionValues("hiveconf")) {
+                       String[] keyval = hiveconf.split("=", 2);
+                       if (keyval.length == 2) {
+                               String name = keyval[0];
+                               String value = keyval[1];
+                               if (name.equals("tmpjars") || 
name.equals("tmpfiles"))
+                                       addToStringCollection(conf, name, 
value);
+                               else
+                                       conf.set(name, value);
+                       }
+               }
+
+               processMoreArguments(cmdln);
+
+               return cmdln;
+       }
+
+       private static void addToStringCollection(Configuration conf, String 
name,
+                       String... values) {
+               addToStringCollection(conf, name, Arrays.asList(values));
+       }
+
+       private static void addToStringCollection(Configuration conf, String 
name,
+                       Collection<? extends String> values) {
+               Collection<String> tmpfiles = conf.getStringCollection(name);
+               tmpfiles.addAll(values);
+               conf.setStrings(name, tmpfiles.toArray(new 
String[tmpfiles.size()]));
+               // System.out.println(name + "=" + conf.get(name));
+       }
+
+       private <T> Class<? extends T> findClass(String className, Class<T> 
base) {
+               try {
+                       Class<?> cls = Class.forName(className);
+                       if (base.isAssignableFrom(cls))
+                               return cls.asSubclass(base);
+                       return null;
+               } catch (ClassNotFoundException e) {
+                       throw new IllegalArgumentException(className
+                                       + ": Invalid class name");
+               }
+       }
+
+       // TODO use Hive util class if this is already provided by it
+       public static Map<String, String> parsePartitionValues(
+                       String outputTablePartitionString) {
+               if (outputTablePartitionString != null) {
+                       Map<String, String> partitionValues = Maps.newHashMap();
+                       for (String partkeyval : 
outputTablePartitionString.split(" *, *")) {
+                               String[] keyval = partkeyval.split(" *= *", 2);
+                               if (keyval.length < 2)
+                                       throw new IllegalArgumentException(
+                                                       "Unrecognized partition 
value format: "
+                                                                       + 
outputTablePartitionString);
+                               partitionValues.put(keyval[0], keyval[1]);
+                       }
+                       return partitionValues;
+               } else
+                       return null;
+       }
+
+       private static String serializePartitionValues(
+                       Map<String, String> outputTablePartitionValues) {
+               StringBuilder outputTablePartitionValuesString = new 
StringBuilder();
+               for (Entry<String, String> partitionValueEntry : 
outputTablePartitionValues
+                               .entrySet()) {
+                       if (outputTablePartitionValuesString.length() != 0)
+                               outputTablePartitionValuesString.append(",");
+                       outputTablePartitionValuesString
+                                       
.append(partitionValueEntry.getKey()).append("=")
+                                       .append(partitionValueEntry.getValue());
+               }
+               return outputTablePartitionValuesString.toString();
+       }
+
+       /** Configuration */
+       private Configuration conf;
+
+       @Override
+       public final Configuration getConf() {
+               return conf;
+       }
+
+       @Override
+       public final void setConf(Configuration conf) {
+               this.conf = conf;
+       }
+
+       /**
+        * Override this method to add more command-line options. You can 
process
+        * them by also overriding {@link #processMoreArguments(CommandLine)}.
+        *
+        * @param options
+        */
+       protected void addMoreOptions(Options options) {
+       }
+
+       /**
+        * Override this method to process additional command-line arguments. 
You
+        * may want to declare additional options by also overriding
+        * {@link #addMoreOptions(Options)}.
+        *
+        * @param cmd
+        */
+       protected void processMoreArguments(CommandLine cmd) {
+       }
+
+       /**
+        * Override this method to do additional setup with the GiraphJob that 
will
+        * run.
+        *
+        * @param job
+        *            GiraphJob that is going to run
+        */
+       protected void initGiraphJob(GiraphJob job) {
+               System.out.println(getClass().getSimpleName() + " with");
+               String prefix = "\t";
+               System.out.println(prefix + "-vertexClass="
+                               + vertexClass.getCanonicalName());
+               System.out.println(prefix + "-vertexInputFormatClass="
+                               + vertexInputFormatClass.getCanonicalName());
+               System.out.println(prefix + "-vertexOutputFormatClass="
+                               + vertexOutputFormatClass.getCanonicalName());
+               System.out.println(prefix + "-inputTable=" + inputTableName);
+               if (inputTableFilterExpr != null)
+                       System.out.println(prefix + "-inputFilter=\""
+                                       + inputTableFilterExpr + "\"");
+               System.out.println(prefix + "-outputTable=" + outputTableName);
+               if (outputTablePartitionValues != null)
+                       System.out.println(prefix + "-outputPartition=\""
+                                       + 
serializePartitionValues(outputTablePartitionValues)
+                                       + "\"");
+               System.out.println(prefix + "-workers=" + workers);
+       }
+
+}

Added: 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java?rev=1390878&view=auto
==============================================================================
--- 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
 (added)
+++ 
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/format/hcatalog/package-info.java
 Thu Sep 27 08:03:01 2012
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of input and output format classes for loading and storing Hive/Pig 
data using HCatalog.
+ */
+package org.apache.giraph.format.hcatalog;
+


Reply via email to