Updated Branches: refs/heads/trunk 9ded6c372 -> e987492ee
GIRAPH-759 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e987492e Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e987492e Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e987492e Branch: refs/heads/trunk Commit: e987492ee3c84e5b7d69af2ecbec2b07f6c4b6ae Parents: 9ded6c3 Author: Claudio Martella <[email protected]> Authored: Mon Nov 18 15:56:42 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Mon Nov 18 15:56:42 2013 +0100 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/io/gora/GoraEdgeInputFormat.java | 409 +++++++++++++++++++ .../io/gora/GoraGEdgeEdgeInputFormat.java | 93 +++++ .../apache/giraph/io/gora/generated/GEdge.java | 314 ++++++++++++++ .../giraph/io/gora/GoraTestEdgeInputFormat.java | 132 ++++++ .../giraph/io/gora/TestGoraEdgeInputFormat.java | 122 ++++++ 6 files changed, 1072 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 5471120..c639ba1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-759: Create EdgeInputFormat from Apache Gora (renato2099 via claudio) + GIRAPH-758: Create VertexOutputFormat to Apache Gora (renato2099 via claudio) GIRAPH-757: Create VertexInputFormat from Apache Gora (renato2099 via claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java new file mode 100644 index 0000000..d0dcc32 --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeInputFormat.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY; + +import java.io.IOException; +import java.util.List; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat; +import org.apache.giraph.io.gora.utils.GoraUtils; +import org.apache.giraph.io.gora.utils.KeyFactory; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +/** + * Class which wraps the GoraInputFormat. It's designed + * as an extension point to EdgeInputFormat subclasses who wish + * to read from Gora data sources. + * + * Works with + * {@link GoraVertexOutputFormat} + * + * @param <I> vertex id type + * @param <E> edge type + */ +public abstract class GoraEdgeInputFormat + <I extends WritableComparable, E extends Writable> + extends EdgeInputFormat<I, E> { + + /** Start key for querying Gora data store. */ + private static Object START_KEY; + + /** End key for querying Gora data store. */ + private static Object END_KEY; + + /** Logger for Gora's vertex input format. */ + private static final Logger LOG = + Logger.getLogger(GoraEdgeInputFormat.class); + + /** KeyClass used for getting data. */ + private static Class<?> KEY_CLASS; + + /** The vertex itself will be used as a value inside Gora. */ + private static Class<? extends Persistent> PERSISTENT_CLASS; + + /** Data store class to be used as backend. */ + private static Class<? extends DataStore> DATASTORE_CLASS; + + /** Class used to transform strings into Keys */ + private static Class<?> KEY_FACTORY_CLASS; + + /** Data store used for querying data. */ + private static DataStore DATA_STORE; + + /** counter for iinput records */ + private static int RECORD_COUNTER = 0; + + /** Delegate Gora input format */ + private static ExtraGoraInputFormat GORA_INPUT_FORMAT = + new ExtraGoraInputFormat(); + + /** + * @param conf configuration parameters + */ + public void checkInputSpecs(Configuration conf) { + String sDataStoreType = + GIRAPH_GORA_DATASTORE_CLASS.get(getConf()); + String sKeyType = + GIRAPH_GORA_KEY_CLASS.get(getConf()); + String sPersistentType = + GIRAPH_GORA_PERSISTENT_CLASS.get(getConf()); + String sKeyFactoryClass = + GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf()); + try { + Class<?> keyClass = Class.forName(sKeyType); + Class<?> persistentClass = Class.forName(sPersistentType); + Class<?> dataStoreClass = Class.forName(sDataStoreType); + Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass); + setKeyClass(keyClass); + setPersistentClass((Class<? extends Persistent>) persistentClass); + setDatastoreClass((Class<? extends DataStore>) dataStoreClass); + setKeyFactoryClass(keyFactoryClass); + setDataStore(createDataStore()); + GORA_INPUT_FORMAT.setDataStore(getDataStore()); + } catch (ClassNotFoundException e) { + LOG.error("Error while reading Gora Input parameters"); + e.printStackTrace(); + } + } + + /** + * Gets the splits for a data store. + * @param context JobContext + * @param minSplitCountHint Hint for a minimum split count + * @return List<InputSplit> A list of splits + */ + @Override + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) + throws IOException, InterruptedException { + KeyFactory kFact = null; + try { + kFact = (KeyFactory) getKeyFactoryClass().newInstance(); + } catch (InstantiationException e) { + LOG.error("Key factory was not instantiated. Please verify."); + LOG.error(e.getMessage()); + e.printStackTrace(); + } catch (IllegalAccessException e) { + LOG.error("Key factory was not instantiated. Please verify."); + LOG.error(e.getMessage()); + e.printStackTrace(); + } + String sKey = GIRAPH_GORA_START_KEY.get(getConf()); + String eKey = GIRAPH_GORA_END_KEY.get(getConf()); + if (sKey == null || sKey.isEmpty()) { + LOG.warn("No start key has been defined."); + LOG.warn("Querying all the data store."); + sKey = null; + eKey = null; + } + kFact.setDataStore(getDataStore()); + setStartKey(kFact.buildKey(sKey)); + setEndKey(kFact.buildKey(eKey)); + Query tmpQuery = GoraUtils.getQuery( + getDataStore(), getStartKey(), getEndKey()); + GORA_INPUT_FORMAT.setQuery(tmpQuery); + List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context); + return splits; + } + + @Override + public abstract GoraEdgeReader createEdgeReader(InputSplit split, + TaskAttemptContext context) throws IOException; + + /** + * Abstract class to be implemented by the user based on their specific + * vertex input. Easiest to ignore the key value separator and only use + * key instead. + */ + protected abstract class GoraEdgeReader extends EdgeReader<I, E> { + /** current edge obtained from Rexster */ + private Edge<I, E> edge; + /** Results gotten from Gora data store. */ + private Result readResults; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + getResults(); + RECORD_COUNTER = 0; + } + + /** + * Gets the next edge from Gora data store. + * @return true/false depending on the existence of vertices. + * @throws IOException exceptions passed along. + * @throws InterruptedException exceptions passed along. + */ + @Override + // CHECKSTYLE: stop IllegalCatch + public boolean nextEdge() throws IOException, InterruptedException { + boolean flg = false; + try { + flg = this.getReadResults().next(); + this.edge = transformEdge(this.getReadResults().get()); + RECORD_COUNTER++; + } catch (Exception e) { + LOG.debug("Error transforming vertices."); + flg = false; + } + LOG.debug(RECORD_COUNTER + " were transformed."); + return flg; + } + // CHECKSTYLE: resume IllegalCatch + + /** + * Gets the progress of reading results from Gora. + * @return the progress of reading results from Gora. + */ + @Override + public float getProgress() throws IOException, InterruptedException { + float progress = 0.0f; + if (getReadResults() != null) { + progress = getReadResults().getProgress(); + } + return progress; + } + + /** + * Gets current edge. + * + * @return The edge object represented by a Gora object + */ + @Override + public Edge<I, E> getCurrentEdge() + throws IOException, InterruptedException { + return this.edge; + } + + /** + * Parser for a single Gora object + * + * @param goraObject vertex represented as a GoraObject + * @return The edge object represented by a Gora object + */ + protected abstract Edge<I, E> transformEdge(Object goraObject); + + /** + * Performs a range query to a Gora data store. + */ + protected void getResults() { + setReadResults(GoraUtils.getRequest(getDataStore(), + getStartKey(), getEndKey())); + } + + /** + * Finishes the reading process. + * @throws IOException. + */ + @Override + public void close() throws IOException { + } + + /** + * Gets the results read. + * @return results read. + */ + Result getReadResults() { + return readResults; + } + + /** + * Sets the results read. + * @param readResults results read. + */ + void setReadResults(Result readResults) { + this.readResults = readResults; + } + } + + /** + * Gets the data store object initialized. + * @return DataStore created + */ + public DataStore createDataStore() { + DataStore dsCreated = null; + try { + dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(), + getKeyClass(), getPersistentClass()); + } catch (GoraException e) { + LOG.error("Error creating data store."); + e.printStackTrace(); + } + return dsCreated; + } + + /** + * Gets the persistent Class + * @return persistentClass used + */ + static Class<? extends Persistent> getPersistentClass() { + return PERSISTENT_CLASS; + } + + /** + * Sets the persistent Class + * @param persistentClassUsed to be set + */ + static void setPersistentClass + (Class<? extends Persistent> persistentClassUsed) { + PERSISTENT_CLASS = persistentClassUsed; + } + + /** + * Gets the key class used. + * @return the key class used. + */ + static Class<?> getKeyClass() { + return KEY_CLASS; + } + + /** + * Sets the key class used. + * @param keyClassUsed key class used. + */ + static void setKeyClass(Class<?> keyClassUsed) { + KEY_CLASS = keyClassUsed; + } + + /** + * @return Class the DATASTORE_CLASS + */ + public static Class<? extends DataStore> getDatastoreClass() { + return DATASTORE_CLASS; + } + + /** + * @param dataStoreClass the dataStore class to set + */ + public static void setDatastoreClass( + Class<? extends DataStore> dataStoreClass) { + DATASTORE_CLASS = dataStoreClass; + } + + /** + * Gets the start key for querying. + * @return the start key. + */ + public Object getStartKey() { + return START_KEY; + } + + /** + * Gets the start key for querying. + * @param startKey start key. + */ + public static void setStartKey(Object startKey) { + START_KEY = startKey; + } + + /** + * Gets the end key for querying. + * @return the end key. + */ + static Object getEndKey() { + return END_KEY; + } + + /** + * Sets the end key for querying. + * @param pEndKey start key. + */ + static void setEndKey(Object pEndKey) { + END_KEY = pEndKey; + } + + /** + * Gets the key factory class. + * @return the kEY_FACTORY_CLASS + */ + static Class<?> getKeyFactoryClass() { + return KEY_FACTORY_CLASS; + } + + /** + * Sets the key factory class. + * @param keyFactoryClass the keyFactoryClass to set. + */ + static void setKeyFactoryClass(Class<?> keyFactoryClass) { + KEY_FACTORY_CLASS = keyFactoryClass; + } + + /** + * Gets the data store. + * @return DataStore + */ + public static DataStore getDataStore() { + return DATA_STORE; + } + + /** + * Sets the data store + * @param dStore the dATA_STORE to set + */ + public static void setDataStore(DataStore dStore) { + DATA_STORE = dStore; + } + + /** + * Returns a logger. + * @return the log for the output format. + */ + public static Logger getLogger() { + return LOG; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java new file mode 100644 index 0000000..e738f36 --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeInputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import java.io.IOException; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.gora.generated.GEdge; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Example implementation of a specific reader for a generated data bean. + */ +public class GoraGEdgeEdgeInputFormat + extends GoraEdgeInputFormat<LongWritable, FloatWritable> { + + /** + * Default constructor + */ + public GoraGEdgeEdgeInputFormat() { + } + + /** + * Creates specific vertex reader to be used inside Hadoop. + * @param split split to be read. + * @param context JobContext to be used. + * @return GoraEdgeReader Edge reader to be used by Hadoop. + */ + @Override + public GoraEdgeReader createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new GoraGEdgeEdgeReader(); + } + + /** + * Gora edge reader + */ + protected class GoraGEdgeEdgeReader extends GoraEdgeReader { + + /** source vertex of the edge */ + private LongWritable sourceId; + + /** + * Transforms a GoraObject into an Edge object. + * @param goraObject Object from Gora to be translated. + * @return Edge Result from transforming the gora object. + */ + @Override + protected Edge<LongWritable, FloatWritable> transformEdge + (Object goraObject) { + Edge<LongWritable, FloatWritable> edge = null; + GEdge goraEdge = (GEdge) goraObject; + Long dest; + Long value; + dest = Long.valueOf(goraEdge.getVertexOutId().toString()); + this.sourceId = new LongWritable(); + this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString())); + value = (long) goraEdge.getEdgeWeight(); + edge = EdgeFactory.create(new LongWritable(dest), + new FloatWritable(value)); + return edge; + } + + /** + * Gets the currentSourceId for the edge. + * @return LongWritable currentSourceId for the edge. + */ + @Override + public LongWritable getCurrentSourceId() throws IOException, + InterruptedException { + return this.sourceId; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java new file mode 100644 index 0000000..f6ac3f7 --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdge.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.gora.generated; + +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.util.Utf8; +import org.apache.gora.persistency.StateManager; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.persistency.impl.StateManagerImpl; + +/** + * Example class for defining a Giraph-Edge. + */ +@SuppressWarnings("all") +public class GEdge extends PersistentBase { + /** + * Schema used for the class. + */ + public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," + + "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," + + "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," + + "{\"name\":\"edgeWeight\",\"type\":\"float\"}," + + "{\"name\":\"vertexInId\",\"type\":\"string\"}," + + "{\"name\":\"vertexOutId\",\"type\":\"string\"}," + + "{\"name\":\"label\",\"type\":\"string\"}]}"); + + /** + * Field enum + */ + public static enum Field { + /** + * Edge id. + */ + EDGE_ID(0, "edgeId"), + + /** + * Edge weight. + */ + EDGE_WEIGHT(1, "edgeWeight"), + + /** + * Edge vertex source id. + */ + VERTEX_IN_ID(2, "vertexInId"), + + /** + * Edge vertex end id. + */ + VERTEX_OUT_ID(3, "vertexOutId"), + + /** + * Edge label. + */ + LABEL(4, "label"); + + /** + * Field index + */ + private int index; + + /** + * Field name + */ + private String name; + + /** + * Field constructor + * @param index of attribute + * @param name of attribute + */ + Field(int index, String name) { + this.index = index; + this.name = name; + } + + /** + * Gets index + * @return int of attribute. + */ + public int getIndex() { + return index; + } + + /** + * Gets name + * @return String of name. + */ + public String getName() { + return name; + } + + /** + * Gets name + * @return String of name. + */ + public String toString() { + return name; + } + }; + + /** + * Array containing all fields/ + */ + private static final String[] ALL_FIELDS = { + "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label" + }; + + static { + PersistentBase.registerFields(GEdge.class, ALL_FIELDS); + } + + /** + * edgeId + */ + private Utf8 edgeId; + + /** + * edgeWeight + */ + private float edgeWeight; + + /** + * vertexInId + */ + private Utf8 vertexInId; + + /** + * vertexOutId + */ + private Utf8 vertexOutId; + + /** + * label + */ + private Utf8 label; + + /** + * Default constructor. + */ + public GEdge() { + this(new StateManagerImpl()); + } + + /** + * Constructor + * @param stateManager from which the object will be created. + */ + public GEdge(StateManager stateManager) { + super(stateManager); + } + + /** + * Creates a new instance + * @param stateManager from which the object will be created. + * @return GEdge created + */ + public GEdge newInstance(StateManager stateManager) { + return new GEdge(stateManager); + } + + /** + * Gets the object schema + * @return Schema of the object. + */ + public Schema getSchema() { + return OBJ_SCHEMA; + } + + /** + * Gets field + * @param fieldIndex index field. + * @return Object from an index. + */ + public Object get(int fieldIndex) { + switch (fieldIndex) { + case 0: + return edgeId; + case 1: + return edgeWeight; + case 2: + return vertexInId; + case 3: + return vertexOutId; + case 4: + return label; + default: + throw new AvroRuntimeException("Bad index"); + } + } + + /** + * Puts a value into a field. + * @param fieldIndex index of field used. + * @param fieldValue value of field used. + */ + @SuppressWarnings(value = "unchecked") + public void put(int fieldIndex, Object fieldValue) { + if (isFieldEqual(fieldIndex, fieldValue)) { + return; + } + getStateManager().setDirty(this, fieldIndex); + switch (fieldIndex) { + case 0: + edgeId = (Utf8) fieldValue; break; + case 1: + edgeWeight = (Float) fieldValue; break; + case 2: + vertexInId = (Utf8) fieldValue; break; + case 3: + vertexOutId = (Utf8) fieldValue; break; + case 4: + label = (Utf8) fieldValue; break; + default: + throw new AvroRuntimeException("Bad index"); + } + } + + /** + * Gets edgeId + * @return Utf8 edgeId + */ + public Utf8 getEdgeId() { + return (Utf8) get(0); + } + + /** + * Sets edgeId + * @param value edgeId + */ + public void setEdgeId(Utf8 value) { + put(0, value); + } + + /** + * Gets edgeWeight + * @return float edgeWeight + */ + public float getEdgeWeight() { + return (Float) get(1); + } + + /** + * Sets edgeWeight + * @param value edgeWeight + */ + public void setEdgeWeight(float value) { + put(1, value); + } + + /** + * Gets edgeVertexInId + * @return Utf8 edgeVertexInId + */ + public Utf8 getVertexInId() { + return (Utf8) get(2); + } + + /** + * Sets edgeVertexInId + * @param value edgeVertexInId + */ + public void setVertexInId(Utf8 value) { + put(2, value); + } + + /** + * Gets edgeVertexOutId + * @return Utf8 edgeVertexOutId + */ + public Utf8 getVertexOutId() { + return (Utf8) get(3); + } + + /** + * Sets edgeVertexOutId + * @param value edgeVertexOutId + */ + public void setVertexOutId(Utf8 value) { + put(3, value); + } + + /** + * Gets edgeLabel + * @return Utf8 edgeLabel + */ + public Utf8 getLabel() { + return (Utf8) get(4); + } + + /** + * Sets edgeLabel + * @param value edgeLabel + */ + public void setLabel(Utf8 value) { + put(4, value); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java new file mode 100644 index 0000000..ba71ce4 --- /dev/null +++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeInputFormat.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import java.io.IOException; + +import org.apache.avro.util.Utf8; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.gora.GoraEdgeInputFormat; +import org.apache.giraph.io.gora.generated.GEdge; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Implementation of a specific reader for a generated data bean. + */ +public class GoraTestEdgeInputFormat + extends GoraEdgeInputFormat<LongWritable, FloatWritable> { + + /** + * Default constructor + */ + public GoraTestEdgeInputFormat() { + } + + /** + * Creates specific vertex reader to be used inside Hadoop. + * @param split split to be read. + * @param context JobContext to be used. + * @return GoraEdgeReader Edge reader to be used by Hadoop. + */ + @Override + public GoraEdgeReader createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException { + putArtificialData(); + return new GoraGEdgeEdgeReader(); + } + + /** + * Writes data into the data store in order to test it out. + */ + @SuppressWarnings("unchecked") + private static void putArtificialData() { + getDataStore().put("11-22", + createEdge("11-22", "11", "22", "11-22", (float)(11+22))); + getDataStore().put("22-11", + createEdge("22-11", "22", "11", "22-11", (float)(22+11))); + getDataStore().put("11-33", + createEdge("11-33", "11", "33", "11-33", (float)(11+33))); + getDataStore().put("33-11", + createEdge("33-11", "33", "11", "33-11", (float)(33+11))); + getDataStore().flush(); + } + + /** + * Creates an edge using an id and a set of edges. + * @param id Vertex id. + * @param vertexInId Vertex source Id. + * @param vertexOutId Vertex destination Id. + * @param edgeLabel Edge label. + * @param edgeWeight Edge wight. + * @return GEdge created. + */ + private static GEdge createEdge(String id, String vertexInId, + String vertexOutId, String edgeLabel, float edgeWeight) { + GEdge newEdge = new GEdge(); + newEdge.setEdgeId(new Utf8(id)); + newEdge.setVertexInId(new Utf8(vertexInId)); + newEdge.setVertexOutId(new Utf8(vertexOutId)); + newEdge.setLabel(new Utf8(edgeLabel)); + newEdge.setEdgeWeight(edgeWeight); + return newEdge; + } + + /** + * Gora edge reader + */ + protected class GoraGEdgeEdgeReader extends GoraEdgeReader { + + /** source vertex of the edge */ + private LongWritable sourceId; + + /** + * Transforms a GoraObject into an Edge object. + * @param goraObject Object from Gora to be translated. + * @return Edge Result from transforming the gora object. + */ + @Override + protected Edge<LongWritable, FloatWritable> transformEdge + (Object goraObject) { + Edge<LongWritable, FloatWritable> edge = null; + GEdge goraEdge = (GEdge) goraObject; + Long dest; + Long value; + dest = Long.valueOf(goraEdge.getVertexOutId().toString()); + this.sourceId = new LongWritable(); + this.sourceId.set(Long.valueOf(goraEdge.getVertexInId().toString())); + value = (long) goraEdge.getEdgeWeight(); + edge = EdgeFactory.create(new LongWritable(dest), + new FloatWritable(value)); + return edge; + } + + /** + * Gets the currentSourceId for the edge. + * @return LongWritable currentSourceId for the edge. + */ + @Override + public LongWritable getCurrentSourceId() throws IOException, + InterruptedException { + return this.sourceId; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/e987492e/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java new file mode 100644 index 0000000..a01fbd3 --- /dev/null +++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeInputFormat.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; +import org.junit.Assert; + +/** + * Test class for Gora vertex input/output formats. + */ +public class TestGoraEdgeInputFormat { + + @Test + public void getEmptyDb() throws Exception { + Iterable<String> results; + Iterator<String> result; + GiraphConfiguration conf = new GiraphConfiguration(); + GIRAPH_GORA_DATASTORE_CLASS. + set(conf, "org.apache.gora.memory.store.MemStore"); + GIRAPH_GORA_KEYS_FACTORY_CLASS. + set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory"); + GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String"); + GIRAPH_GORA_PERSISTENT_CLASS. + set(conf,"org.apache.giraph.io.gora.generated.GEdge"); + GIRAPH_GORA_START_KEY.set(conf,"1"); + GIRAPH_GORA_END_KEY.set(conf,"3"); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.setComputationClass(EmptyComputation.class); + conf.setEdgeInputFormatClass(GoraGEdgeEdgeInputFormat.class); + results = InternalVertexRunner.run(conf, new String[0], new String[0]); + Assert.assertNotNull(results); + result = results.iterator(); + Assert.assertFalse(result.hasNext()); + } + + @Test + public void getTestDb() throws Exception { + Iterable<String> results; + GiraphConfiguration conf = new GiraphConfiguration(); + GIRAPH_GORA_DATASTORE_CLASS. + set(conf, "org.apache.gora.memory.store.MemStore"); + GIRAPH_GORA_KEYS_FACTORY_CLASS. + set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory"); + GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String"); + GIRAPH_GORA_PERSISTENT_CLASS. + set(conf,"org.apache.giraph.io.gora.generated.GEdge"); + GIRAPH_GORA_START_KEY.set(conf,"1"); + GIRAPH_GORA_END_KEY.set(conf,"4"); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.setComputationClass(EmptyComputation.class); + conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + results = InternalVertexRunner.run(conf, new String[0], new String[0]); + Assert.assertNotNull(results); + Assert.assertEquals(3, ((ArrayList<?>)results).size()); + if (results instanceof Collection<?> + & (((Collection<?>)results).size() == 2)) { + Assert.assertEquals("33\t0.0", + ((ArrayList<?>)results).get(0).toString()); + Assert.assertEquals("22\t0.0", + ((ArrayList<?>)results).get(1).toString()); + Assert.assertEquals("11\t0.0", + ((ArrayList<?>)results).get(2).toString()); + } + } + + /* + Test compute method that sends each edge a notification of its parents. + The test set only has a 1-1 parent-to-child ratio for this unit test. + */ + public static class EmptyComputation + extends BasicComputation<LongWritable, DoubleWritable, + FloatWritable, LongWritable> { + + @Override + public void compute( + Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, + Iterable<LongWritable> messages) throws IOException { + Assert.assertNotNull(vertex); + vertex.voteToHalt(); + } + } +}
