http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java index e445d51..699d051 100644 --- a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java @@ -20,6 +20,12 @@ package org.apache.gora.dynamodb.query; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import org.apache.gora.filter.Filter; import org.apache.gora.persistency.Persistent; @@ -29,13 +35,12 @@ import org.apache.gora.store.DataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.dynamodb.datamodeling.DynamoDBQueryExpression; -import com.amazonaws.services.dynamodb.datamodeling.DynamoDBScanExpression; -import com.amazonaws.services.dynamodb.model.AttributeValue; -import com.amazonaws.services.dynamodb.model.ComparisonOperator; -import com.amazonaws.services.dynamodb.model.Condition; -import com.amazonaws.services.dynamodb.model.KeySchema; -import com.amazonaws.services.dynamodb.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBScanExpression; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.ComparisonOperator; +import com.amazonaws.services.dynamodbv2.model.Condition; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { @@ -69,6 +74,8 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { */ public static final String SCAN_QUERY = "scan"; + public static final ComparisonOperator DEFAULT_SCAN_OP = ComparisonOperator.GE; + /** * Query type property */ @@ -88,26 +95,29 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Key schema used for the query */ - private KeySchema keySchema; + private ArrayList<KeySchemaElement> keySchema; /** * Hash key used for the query */ private K hashKey; + private Map<String, String> keyItems; + /** * Default Constructor */ public DynamoDBQuery(){ - super(null); + super(null); } - + /** * Constructor + * * @param dataStore */ public DynamoDBQuery(DataStore<K, T> dataStore) { - super(dataStore); + super(dataStore); } /** @@ -126,153 +136,213 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { return this.hashKey; } + private void defineQueryParams() { + if ((query.getStartKey() != null || query.getKey() != null) + && query.getEndKey() != null) { + DynamoDBQuery.setType(RANGE_QUERY); + } else if (query.getKey() != null || query.getStartKey() != null) { + DynamoDBQuery.setType(SCAN_QUERY); + } + } + /** * Builds query expression depending on query type (range or scan) */ - public void buildExpression(){ - AttributeValue hashAttrValue = buildKeyHashAttribute(); - if (hashAttrValue == null) - throw new IllegalStateException("There is not a key schema defined."); - if (DynamoDBQuery.getType().equals(RANGE_QUERY)){ - Condition newCondition = buildRangeCondition(); - buildQueryExpression(newCondition, hashAttrValue); + public void buildExpression() { + defineQueryParams(); + if (DynamoDBQuery.getType().equals(RANGE_QUERY)) { + buildRangeExpression(); + } else if (DynamoDBQuery.getType().equals(SCAN_QUERY)) { + buildScanExpression(); + } else { + throw new IllegalArgumentException("Query type not supported"); } - if (DynamoDBQuery.getType().equals(SCAN_QUERY)) - buildScanExpression(hashAttrValue); } /** - * Builds scan query expression using a hash attribute value where to start - * @param pHashAttrValueHash attribute value where to start scanning + * Builds hash key attribute from generic query received. + * + * @param qKey + * + * @returnAttributeValue build from query */ - public void buildScanExpression(AttributeValue pHashAttrValue){ - DynamoDBScanExpression newScanExpression = new DynamoDBScanExpression(); - // TODO right now we only support scanning using the key, but we should support other types of scans - newScanExpression.addFilterCondition(getKeySchema().getHashKeyElement().getAttributeName(), buildKeyScanCondition()); - dynamoDBExpression = newScanExpression; + private Map<String, AttributeValue> buildHashKey(K qKey) { + Map<String, AttributeValue> hashKey = new HashMap<>(); + for (KeySchemaElement key : getKeySchema()) { + AttributeValue attr = new AttributeValue(); + if (key.getKeyType().equals(KeyType.HASH.toString())) { + if (keyItems.get(key.getAttributeName()).equals("N")) { + attr.withN(getHashKey(qKey).toString()); + } else if (keyItems.get(key.getAttributeName()).equals("S")) { + attr.withS(getHashKey(qKey).toString()); + } else if (keyItems.get(key.getAttributeName()).equals("B")) { + attr.withB(ByteBuffer.wrap(getHashKey(qKey).toString().getBytes(Charset.defaultCharset()))); + } else { + throw new IllegalArgumentException("Data type not supported for " + + key.getAttributeName()); + } + hashKey.put(key.getAttributeName(), attr); + } + } + if (hashKey.isEmpty()) { + throw new IllegalStateException("No key value has been defined."); + } + return hashKey; } /** - * Builds range query expression - * @param pNewConditionCondition for querying - * @param pHashAttrValueHash attribute value where to start - */ - public void buildQueryExpression(Condition pNewCondition, AttributeValue pHashAttrValue) { - DynamoDBQueryExpression newQueryExpression = new DynamoDBQueryExpression(pHashAttrValue); - newQueryExpression.setConsistentRead(getConsistencyReadLevel()); - newQueryExpression.setRangeKeyCondition(pNewCondition); - dynamoDBExpression = newQueryExpression; + * Builds range key attribute from generic query received. + * + * @param qKey + * + * @return + */ + private Map<String, AttributeValue> buildRangeKey(K qKey) { + Map<String, AttributeValue> kAttrs = new HashMap<>(); + for (KeySchemaElement key : getKeySchema()) { + AttributeValue attr = new AttributeValue(); + if (key.getKeyType().equals(KeyType.RANGE.toString())) { + if (keyItems.get(key.getAttributeName()).equals("N")) { + attr.withN(getRangeKey(qKey).toString()); + } else if (keyItems.get(key.getAttributeName()).equals("S")) { + attr.withS(getRangeKey(qKey).toString()); + } else if (keyItems.get(key.getAttributeName()).equals("B")) { + attr.withB(ByteBuffer.wrap(getRangeKey(qKey).toString().getBytes(Charset.defaultCharset()))); + } else { + throw new IllegalArgumentException("Data type not supported for " + + key.getAttributeName()); + } + kAttrs.put(key.getAttributeName(), attr); + } + } + return kAttrs; } /** - * Builds hash key attribute from generic query received - * @returnAttributeValue build from query - */ - private AttributeValue buildKeyHashAttribute(){ - String pAttrType = getKeySchema().getHashKeyElement().getAttributeType(); - if(pAttrType.equals("S")) - return new AttributeValue().withS(getHashKey(query.getKey()).toString()); - else if(pAttrType.equals("N")) - return new AttributeValue().withN(getHashKey(query.getKey()).toString()); - return null; + * Builds scan query expression using a hash attribute value where to start + * + * @param pHashAttrValueHash + * attribute value where to start scanning + */ + public void buildScanExpression() { + K qKey = getKey(); + if (qKey == null) { + LOG.warn("No key defined. Trying with startKey."); + qKey = query.getStartKey(); + if (qKey == null) { + throw new IllegalStateException("No key has been defined please check"); + } + } + ComparisonOperator compOp = getScanCompOp() != null ? getScanCompOp() + : DEFAULT_SCAN_OP; + + DynamoDBScanExpression newScanExpression = new DynamoDBScanExpression(); + // hash key condition + Map<String, AttributeValue> hashAttrVals = buildHashKey(qKey); + for (Entry<String, AttributeValue> en : hashAttrVals.entrySet()) { + Condition scanFilterHashCondition = new Condition().withComparisonOperator( + compOp.toString()).withAttributeValueList(en.getValue()); + newScanExpression.addFilterCondition(en.getKey(), scanFilterHashCondition); + } + // range key condition + Map<String, AttributeValue> rangeAttrVals = buildRangeKey(qKey); + for (Entry<String, AttributeValue> en : rangeAttrVals.entrySet()) { + Condition scanFilterRangeCondition = new Condition().withComparisonOperator( + compOp.toString()).withAttributeValueList(en.getValue()); + newScanExpression.addFilterCondition(en.getKey(), scanFilterRangeCondition); + } + dynamoDBExpression = newScanExpression; + } + + /** + * Builds range query expression + * + */ + public void buildRangeExpression() { + DynamoDBScanExpression queryExpression = new DynamoDBScanExpression(); + ComparisonOperator compOp = ComparisonOperator.BETWEEN; + // hash key range + Map<String, AttributeValue> hashAttrVals = buildHashKey(query.getStartKey()); + Map<String, AttributeValue> endHashAttrVals = buildHashKey(query.getEndKey()); + for (Entry<String, AttributeValue> en : hashAttrVals.entrySet()) { + Condition scanFilterHashCondition = new Condition().withComparisonOperator( + compOp.toString()).withAttributeValueList(en.getValue(), endHashAttrVals.get(en.getKey())); + queryExpression.addFilterCondition(en.getKey(), scanFilterHashCondition); + } + // range key range + Map<String, AttributeValue> rangeAttrVals = buildRangeKey(query.getStartKey()); + Map<String, AttributeValue> endRangeAttrVals = buildRangeKey(query.getEndKey()); + for (Entry<String, AttributeValue> en : rangeAttrVals.entrySet()) { + Condition scanFilterRangeCondition = new Condition().withComparisonOperator( + compOp.toString()).withAttributeValueList(en.getValue(), endRangeAttrVals.get(en.getKey())); + queryExpression.addFilterCondition(en.getKey(), scanFilterRangeCondition); + } + dynamoDBExpression = queryExpression; } /** * Gets hash key for querying + * * @param key * @return */ private Object getHashKey(K key){ Object hashKey = null; try { - // Our key may be have hash and range keys - for (Method met :key.getClass().getDeclaredMethods()){ - if(met.getName().equals("getHashKey")){ - Object [] params = null; - hashKey = met.invoke(key, params); - break; + // Our key may be have hash and range keys + for (Method met : key.getClass().getDeclaredMethods()) { + if (met.getName().equals("getHashKey")) { + Object[] params = null; + hashKey = met.invoke(key, params); + break; + } } + } catch (IllegalArgumentException e) { + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new IllegalArgumentException(e); + } catch (IllegalAccessException e) { + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new RuntimeException(e); } - } catch (IllegalArgumentException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); - } catch (IllegalAccessException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); - } catch (InvocationTargetException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); - } - return hashKey; + return hashKey; } /** * Gets range key for querying from generic query object received + * * @param key * @return */ private Object getRangeKey(K key){ Object rangeKey = null; try { - // Our key may be have hash and range keys + // Our key may be have hash and range keys for (Method met :key.getClass().getDeclaredMethods()){ - if(met.getName().equals("getRangeKey")){ - Object [] params = null; - rangeKey = met.invoke(key, params); - break; + if(met.getName().equals("getRangeKey")){ + Object [] params = null; + rangeKey = met.invoke(key, params); + break; } } } catch (IllegalArgumentException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new IllegalArgumentException(e); } catch (IllegalAccessException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new RuntimeException(e); } catch (InvocationTargetException e) { - LOG.info("DynamoDBStore: Error while trying to fetch range key."); - e.printStackTrace(); + LOG.info("DynamoDBStore: Error while trying to fetch range key.", e.getMessage()); + throw new RuntimeException(e); } return rangeKey; } /** - * Builds key scan condition using scan comparator, and hash key attribute - * @return - */ - private Condition buildKeyScanCondition(){ - Condition scanKeyCondition = new Condition(); - scanKeyCondition.setComparisonOperator(getScanCompOp()); - scanKeyCondition.withAttributeValueList(buildKeyHashAttribute()); - return scanKeyCondition; - } - - /** - * Builds range condition based on elements set - * @return - */ - private Condition buildRangeCondition(){ - KeySchemaElement kRangeSchema = getKeySchema().getRangeKeyElement(); - Condition rangeKeyCondition = null; - if(kRangeSchema != null){ - rangeKeyCondition = new Condition(); - rangeKeyCondition.setComparisonOperator(ComparisonOperator.BETWEEN.toString()); - AttributeValue startVal = null, endVal = null; - //startVal = buildKeyHashAttribute(); - if(kRangeSchema.getAttributeType().equals("S")){ - startVal = new AttributeValue().withS(getRangeKey(query.getStartKey()).toString()); - endVal = new AttributeValue().withS(getRangeKey(query.getEndKey()).toString()); - } - else if (kRangeSchema.getAttributeType().equals("N")){ - startVal = new AttributeValue().withN(getRangeKey(query.getStartKey()).toString()); - endVal = new AttributeValue().withN(getRangeKey(query.getEndKey()).toString()); - } - rangeKeyCondition.withAttributeValueList(startVal, endVal); - } - return rangeKeyCondition; - } - - /** * Gets read consistency level + * * @return */ public boolean getConsistencyReadLevel(){ @@ -281,6 +351,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Sets read consistency level + * * @param pConsistencyReadLevel */ public void setConsistencyReadLevel(boolean pConsistencyReadLevel){ @@ -289,14 +360,16 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Gets key schema + * * @return */ - public KeySchema getKeySchema(){ + public ArrayList<KeySchemaElement> getKeySchema(){ return keySchema; } /** * Gets query expression for query + * * @return */ public Object getQueryExpression(){ @@ -305,22 +378,26 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Sets query key schema used for queying - * @param pKeySchema + * + * @param arrayList */ - public void setKeySchema(KeySchema pKeySchema){ - this.keySchema = pKeySchema; + public void setKeySchema(ArrayList<KeySchemaElement> arrayList) { + this.keySchema = arrayList; } /** * Sets query to be performed + * * @param pQuery */ public void setQuery(Query<K, T> pQuery){ - this.query = pQuery; + this.setStartKey(query.getStartKey()); + this.setEndKey(query.getEndKey()); } /** * Gets query performed + * * @return */ public Query<K, T> getQuery(){ @@ -329,6 +406,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Gets query type + * * @return */ public static String getType() { @@ -337,6 +415,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Sets query type + * * @param pType */ public static void setType(String pType) { @@ -345,16 +424,16 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Gets scan comparator operator + * * @return */ public static ComparisonOperator getScanCompOp() { - if (scanCompOp == null) - scanCompOp = ComparisonOperator.GE; return scanCompOp; } /** * Sets scan query comparator operator + * * @param scanCompOp */ public static void setScanCompOp(ComparisonOperator scanCompOp) { @@ -363,6 +442,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { /** * Gets range query comparator operator + * * @return */ public static ComparisonOperator getRangeCompOp(){ @@ -379,10 +459,19 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { rangeCompOp = pRangeCompOp; } + /** + * Sets the keyItems that could be used. + * + * @param items + */ + public void setKeyItems(Map<String, String> items) { + keyItems = items; + } + @Override public void setFilter(Filter<K, T> filter) { // TODO Auto-generated method stub - + } @Override @@ -394,7 +483,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> { @Override public void setLocalFilterEnabled(boolean enable) { // TODO Auto-generated method stub - + } @Override
http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java new file mode 100644 index 0000000..98f64ae --- /dev/null +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.dynamodb.store; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.impl.DataStoreBase; + +public class DynamoDBAvroStore<K, T extends PersistentBase> extends +DataStoreBase<K, T> implements IDynamoDB<K, T> { + + /** + * The values are Avro fields pending to be stored. + * + * We want to iterate over the keys in insertion order. We don't want to lock + * the entire collection before iterating over the keys, since in the meantime + * other threads are adding entries to the map. + */ + private Map<K, T> buffer = Collections + .synchronizedMap(new LinkedHashMap<K, T>()); + + private DynamoDBStore<K, ? extends Persistent> dynamoDBStoreHandler; + + /** + * Sets the handler to the main DynamoDB + * + * @param DynamoDBStore + * handler to main DynamoDB + */ + @Override + public void setDynamoDBStoreHandler(DynamoDBStore<K, T> dynamoHandler) { + this.dynamoDBStoreHandler = dynamoHandler; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public void createSchema() { + // TODO Auto-generated method stub + + } + + @Override + public boolean delete(K arg0) { + // TODO Auto-generated method stub + return false; + } + + @Override + public long deleteByQuery(Query<K, T> arg0) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void deleteSchema() { + // TODO Auto-generated method stub + + } + + @Override + public Result<K, T> execute(Query<K, T> arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void flush() { + // TODO Auto-generated method stub + + } + + @Override + public T get(K arg0, String[] arg1) { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> arg0) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getSchemaName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Query<K, T> newQuery() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void put(K key, T value) { + buffer.put(key, value); + } + + @Override + public boolean schemaExists() { + // TODO Auto-generated method stub + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java new file mode 100644 index 0000000..589367e --- /dev/null +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.dynamodb.store; + +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamoDBFactory { + + /** Helper to write useful information into the logs. */ + public static final Logger LOG = LoggerFactory + .getLogger(DynamoDBFactory.class); + + @SuppressWarnings("unchecked") + public static <K, T extends Persistent> IDynamoDB<K, T> buildDynamoDBStore( + DynamoDBUtils.DynamoDBType serType) { + final IDynamoDB<K, T> ds; + switch (serType) { + case DYNAMO: + ds = new DynamoDBNativeStore<K, T>(); + LOG.debug("Using DynamoDB based serialization mode."); + break; + case AVRO: + ds = (IDynamoDB<K, T>) new DynamoDBAvroStore<K, PersistentBase>(); + LOG.debug("Using Avro based serialization mode."); + break; + default: + throw new IllegalStateException("Serialization mode not supported."); + } + return ds; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java index f6b23a9..2db5fd8 100644 --- a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java @@ -18,90 +18,102 @@ package org.apache.gora.dynamodb.store; +import static org.apache.gora.dynamodb.store.DynamoDBUtils.DYNAMO_KEY_HASHRANGE; + import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.dynamodb.model.KeySchema; -import com.amazonaws.services.dynamodb.model.KeySchemaElement; -import com.amazonaws.services.dynamodb.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; public class DynamoDBMapping { - + /** * Helper to write useful information into the logs */ public static final Logger LOG = LoggerFactory.getLogger(DynamoDBMapping.class); - + /** * a map from field name to attribute value */ - private final Map<String, List<Map<String, String>>> tablesToItems; - + private final Map<String, Map<String, String>> tablesToItems; + /** * Maps tables to their own key schemas */ - private final Map<String, KeySchema> tablesToKeySchemas; - + private final Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas; + /** * Maps tables to their provisioned throughput */ private final Map<String, ProvisionedThroughput> tablesToPrTh; - + /** - * Constructor for DynamoDBMapping - * @param tables Tables mapped. - * @param tablesToKeySchemas KeySchemas used within tables mapped. - * @param provisionedThroughput Provisioned throughput used within tables mapped. + * Constructor for DynamoDBMapping + * + * @param tablesToItems2 + * Tables mapped. + * @param tablesToKeySchemas + * KeySchemas used within tables mapped. + * @param provisionedThroughput + * Provisioned throughput used within tables mapped. */ - public DynamoDBMapping(Map<String, List<Map<String, String>>> tables, - Map<String, KeySchema> tablesToKeySchemas, - Map<String, ProvisionedThroughput> provisionedThroughput) { - - this.tablesToItems = tables; + public DynamoDBMapping(Map<String, Map<String, String>> tablesToItems2, + Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas, + Map<String, ProvisionedThroughput> provisionedThroughput) { + + this.tablesToItems = tablesToItems2; this.tablesToKeySchemas = tablesToKeySchemas; this.tablesToPrTh = provisionedThroughput; } /** * Gets the tables with their own items - * @return tablesToItem HashMap + * + * @return tablesToItem + * HashMap */ - public Map<String,List<Map<String, String>>> getTables(){ + public Map<String, Map<String, String>> getTables() { return tablesToItems; } - + /** * Gets items or attributes from a specific table - * @param tableName Table name to determine which attributes to get + * + * @param tableName + * table name to determine which attributes to get * @return */ - public List<Map<String, String>> getItems(String tableName){ + public Map<String, String> getItems(String tableName) { return tablesToItems.get(tableName); } /** * Gets the key schema from a specific table - * @param tableName Table name to determine which key schema to get + * @param tableName + * Table name to determine which key schema to get * @return */ - public KeySchema getKeySchema(String tableName) { + public ArrayList<KeySchemaElement> getKeySchema(String tableName) { return tablesToKeySchemas.get(tableName); } - + /** * Gets the provisioned throughput from a specific table - * @param tableName Table name to determine which provisioned throughput to get + * + * @param tableName + * Table name to determine which provisioned throughput to get * @return */ public ProvisionedThroughput getProvisionedThroughput(String tableName){ return tablesToPrTh.get(tableName); } - + /** * A builder for creating the mapper. This will allow building a thread safe * {@link DynamoDBMapping} using simple immutabilty. @@ -110,199 +122,206 @@ public class DynamoDBMapping { public static class DynamoDBMappingBuilder { /** - * Table name to be used to build the DynamoDBMapping object - */ - private String tableName; - - /** * This data structure can hold several tables, with their own items. * Map<TableName, List<Map<AttributeName,AttributeType>> */ - private Map<String, List<Map<String, String>>> tablesToItems = - new HashMap<String, List<Map<String, String>>>(); + private Map<String, Map<String, String>> tablesToItems = + new HashMap<String, Map<String, String>>(); /** * Maps tables to key schemas */ - private Map<String, KeySchema> tablesToKeySchemas = new HashMap<String, KeySchema>(); + private Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas = + new HashMap<String, ArrayList<KeySchemaElement>>(); /** * Maps tables to provisioned throughput */ - private Map<String, ProvisionedThroughput> tablesToPrTh = new HashMap<String, ProvisionedThroughput>(); - - /** - * Sets table name - * @param tabName - */ - public void setTableName(String tabName){ - tableName = tabName; - } - + private Map<String, ProvisionedThroughput> tablesToPrTh = + new HashMap<String, ProvisionedThroughput>(); + /** * Gets the table name for which the table is being mapped + * * @param tableName * @return */ public String getTableName(String tableName){ return tableName; } - + /** * Sets the provisioned throughput for the specified table + * * @param tableName * @param readCapUnits * @param writeCapUnits */ - public void setProvisionedThroughput(String tableName, long readCapUnits, long writeCapUnits){ - ProvisionedThroughput ptDesc = - new ProvisionedThroughput().withReadCapacityUnits(readCapUnits).withWriteCapacityUnits(writeCapUnits); - tablesToPrTh.put(tableName, ptDesc); + public void setProvisionedThroughput(String tableName, long readCapUnits, + long writeCapUnits) { + ProvisionedThroughput ptDesc = new ProvisionedThroughput() + .withReadCapacityUnits(readCapUnits).withWriteCapacityUnits( + writeCapUnits); } - + /** * Sets the hash range key schema for the specified table + * * @param tableName * @param rangeKeyName * @param rangeKeyType */ - public void setHashRangeKeySchema(String tableName, String rangeKeyName, String rangeKeyType){ - KeySchema kSchema = tablesToKeySchemas.get(tableName); - if ( kSchema == null) - kSchema = new KeySchema(); - - KeySchemaElement rangeKeyElement = new KeySchemaElement().withAttributeName(rangeKeyName).withAttributeType(rangeKeyType); - kSchema.setRangeKeyElement(rangeKeyElement); - tablesToKeySchemas.put(tableName, kSchema); - } - + // public void setHashRangeKeySchema(String tableName, String rangeKeyName, + // String rangeKeyType){ + // KeySchemaElement kSchema = tablesToKeySchemas.get(tableName); + // if ( kSchema == null) + // kSchema = new KeySchemaElement(); + + // KeySchemaElement rangeKeyElement = new + // KeySchemaElement().withAttributeName(rangeKeyName).withKeyType(KeyType.RANGE).withKeyType(rangeKeyType); + // kSchema. + // kSchema.setRangeKeyElement(rangeKeyElement); + // tablesToKeySchemas.put(tableName, kSchema); + // } + /** * Sets the hash key schema for the specified table * @param tableName * @param keyName * @param keyType + * @param keyType2 */ - public void setHashKeySchema(String tableName, String keyName, String keyType){ - KeySchema kSchema = tablesToKeySchemas.get(tableName); - if ( kSchema == null) - kSchema = new KeySchema(); - KeySchemaElement hashKey = new KeySchemaElement().withAttributeName(keyName).withAttributeType(keyType); - kSchema.setHashKeyElement(hashKey); + public void setKeySchema(String tableName, String keyName, String keyType) { + ArrayList<KeySchemaElement> kSchema = tablesToKeySchemas.get(tableName); + if (kSchema == null) { + kSchema = new ArrayList<KeySchemaElement>(); tablesToKeySchemas.put(tableName, kSchema); + } + KeyType type = keyType.equals(DYNAMO_KEY_HASHRANGE) ? KeyType.RANGE : KeyType.HASH; + kSchema.add(new KeySchemaElement().withAttributeName(keyName) + .withKeyType(type)); } - + /** - * Checks if a table exists, and if doesn't exist it creates the new table. + * Checks if a table exists, and if doesn't exist it creates the new table. + * * @param tableName * @return The table identified by the parameter */ - private List<Map<String, String>> getOrCreateTable(String tableName) { - - List<Map<String, String>> items = tablesToItems.get(tableName); + private Map<String, String> getOrCreateTable(String tableName) { + Map<String, String> items = tablesToItems.get(tableName); if (items == null) { - items = new ArrayList<Map<String, String>>(); + items = new HashMap<String, String>(); tablesToItems.put(tableName, items); } return items; } - + /** - * Gets the attribute for a specific item. The idea is to be able to get different items with different attributes. - * TODO This method is incomplete because the itemNumber might not be present and this would be a problem + * Gets the attribute for a specific item. The idea is to be able to get + * different items with different attributes. + * TODO This method is incomplete because the itemNumber might not + * be present and this would be a problem + * * @param items * @param itemNumber * @return */ - private HashMap<String, String> getOrCreateItemAttribs(List<Map<String, String>> items, int itemNumber){ + /*private HashMap<String, String> getOrCreateItemAttribs( + Map<String, String> items) { HashMap<String, String> itemAttribs; - + if (items.isEmpty()) items.add(new HashMap<String, String>()); - + itemAttribs = (HashMap<String, String>) items.get(itemNumber); - if (itemAttribs == null) - items.add(new HashMap<String, String>()); - return (HashMap<String, String>) items.get(itemNumber); - } - + if (itemAttribs == null) { + itemAttribs = new HashMap<String, String>(); + } + + items.add(itemAttribs); + return null; + }*/ + /** * Adds an attribute to an specific item + * * @param tableName * @param attributeName * @param attrType * @param itemNumber */ - public void addAttribute(String tableName, String attributeName, String attrType, int itemNumber) { - // selecting table - List<Map<String, String>> items = getOrCreateTable(tableName); - // add attribute to item - HashMap<String, String> itemAttribs = getOrCreateItemAttribs(items, itemNumber); - itemAttribs.put(attributeName, attrType); - //items.add(itemAttribs); - // add item to table - //tablesToItems.put(tableName, items); - } - + public void addAttribute(String tableName, String attributeName, + String attrType) { + // selecting table + Map<String, String> items = getOrCreateTable(tableName); + // add attribute to item + //HashMap<String, String> itemAttribs = getOrCreateItemAttribs(items); + //itemAttribs.put(attributeName, attrType); + // add item to table + items.put(attributeName, attrType); + // tablesToItems.put(tableName, items); + } + /** * Method to verify whether or not the schemas have been initialized + * * @return */ - private String verifyAllKeySchemas(){ - - String wrongTable = ""; - // if there are not tables defined - if (tablesToItems.isEmpty()) return ""; - for(String tableName : tablesToItems.keySet()){ - // if there are not schemas defined - if (tablesToKeySchemas.isEmpty()) return ""; - if (!verifyKeySchema(tableName)) return ""; + private boolean verifyAllKeySchemas() { + boolean rsl = true; + if (tablesToItems.isEmpty() || tablesToKeySchemas.isEmpty()) + rsl = false; + for (String tableName : tablesToItems.keySet()) { + // if there are not schemas defined + if (tablesToKeySchemas.get(tableName) == null) { + LOG.error("No schema defined for DynamoDB table '" + tableName + '\''); + rsl = false; } - return wrongTable; + rsl = verifyKeySchema(tableName); + } + return rsl; } - + /** * Verifies is a table has a key schema defined + * * @param tableName Table name to determine which key schema to obtain * @return */ - private boolean verifyKeySchema(String tableName){ - KeySchema kSchema = tablesToKeySchemas.get(tableName); - - if (kSchema == null) - return false; - - KeySchemaElement rangeKey = kSchema.getRangeKeyElement(); - KeySchemaElement hashKey = kSchema.getHashKeyElement(); - // A range key must have a hash key as well - - if (rangeKey != null){ - if (hashKey != null) - return true; - else - return false; + private boolean verifyKeySchema(String tableName) { + ArrayList<KeySchemaElement> kSchema = tablesToKeySchemas.get(tableName); + boolean hashPk = false; + if (kSchema == null) { + LOG.error("No keys defined for '{}'. Please check your schema!", tableName); + return hashPk; } - // A hash key may exist by itself - if (hashKey != null) - return true; - return false; + for (KeySchemaElement ks : kSchema) { + if (ks.getKeyType().equals(KeyType.HASH.toString())) { + hashPk = true; + } + } + return hashPk; } - + /** * Constructs the DynamoDBMapping object + * * @return A newly constructed mapping. */ public DynamoDBMapping build() { - if (tableName == null) throw new IllegalStateException("tableName is not specified"); - // verifying items for at least a table - if (tablesToItems.isEmpty()) throw new IllegalStateException("No tables"); - - // verifying if key schemas have been properly defined - String wrongTableName = verifyAllKeySchemas(); - if (!wrongTableName.equals("")) throw new IllegalStateException("no key schemas defined for table " + wrongTableName); - - // Return the tableDescription and all the attributes needed - return new DynamoDBMapping(tablesToItems,tablesToKeySchemas, tablesToPrTh); + // verifying items for at least a table + if (tablesToItems.isEmpty()) + throw new IllegalStateException("No tables were defined."); + + // verifying if key schemas have been properly defined + if (!verifyAllKeySchemas()) + throw new IllegalStateException("no key schemas defined for table "); + + // Return the tableDescription and all the attributes needed + return new DynamoDBMapping(tablesToItems, tablesToKeySchemas, + tablesToPrTh); } } } http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java new file mode 100644 index 0000000..6fe1742 --- /dev/null +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.dynamodb.store; + +import static org.apache.gora.dynamodb.store.DynamoDBUtils.WS_PROVIDER; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.NullArgumentException; +import org.apache.gora.dynamodb.query.DynamoDBKey; +import org.apache.gora.dynamodb.query.DynamoDBQuery; +import org.apache.gora.dynamodb.query.DynamoDBResult; +import org.apache.gora.persistency.BeanFactory; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.ws.impl.WSDataStoreBase; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBScanExpression; + +public class DynamoDBNativeStore<K, T extends Persistent> extends + WSDataStoreBase<K, T> implements IDynamoDB<K, T> { + + /** Method's names for getting range and hash keys. */ + private static final String GET_RANGE_KEY_METHOD = "getRangeKey"; + private static final String GET_HASH_KEY_METHOD = "getHashKey"; + + /** Logger for {@link DynamoDBNativeStore}. */ + public static final Logger LOG = LoggerFactory + .getLogger(DynamoDBNativeStore.class); + + /** Handler to {@link DynamoDBStore} so common methods can be accessed. */ + private DynamoDBStore<K, T> dynamoDBStoreHandler; + + /** + * Deletes items using a specific query + * + * @throws IOException + */ + @Override + @SuppressWarnings("unchecked") + public long deleteByQuery(Query<K, T> query) { + // TODO verify whether or not we are deleting a whole row + // String[] fields = getFieldsToQuery(query.getFields()); + // find whether all fields are queried, which means that complete + // rows will be deleted + // boolean isAllFields = Arrays.equals(fields + // , getBeanFactory().getCachedPersistent().getFields()); + Result<K, T> result = execute(query); + ArrayList<T> deletes = new ArrayList<T>(); + try { + while (result.next()) { + T resultObj = result.get(); + deletes.add(resultObj); + + @SuppressWarnings("rawtypes") + DynamoDBKey dKey = new DynamoDBKey(); + + dKey.setHashKey(getHashFromObj(resultObj)); + + dKey.setRangeKey(getRangeKeyFromObj(resultObj)); + delete((K) dKey); + } + } catch (IllegalArgumentException e) { + LOG.error("Illegal argument detected", e.getMessage()); + throw new IllegalArgumentException(e); + } catch (IllegalAccessException e) { + LOG.error("Illegal access detected", e.getMessage()); + throw new IllegalAccessError(e.getMessage()); + } catch (InvocationTargetException e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + return deletes.size(); + } + + /** + * Executes a query after building a DynamoDB specific query based on the + * received one + */ + @Override + public Result<K, T> execute(Query<K, T> query) { + DynamoDBQuery<K, T> dynamoDBQuery = buildDynamoDBQuery(query); + DynamoDBMapper mapper = new DynamoDBMapper( + dynamoDBStoreHandler.getDynamoDbClient()); + List<T> objList = null; + if (DynamoDBQuery.getType().equals(DynamoDBQuery.RANGE_QUERY)) + objList = mapper.scan(persistentClass, + (DynamoDBScanExpression) dynamoDBQuery.getQueryExpression()); + if (DynamoDBQuery.getType().equals(DynamoDBQuery.SCAN_QUERY)) + objList = mapper.scan(persistentClass, + (DynamoDBScanExpression) dynamoDBQuery.getQueryExpression()); + return new DynamoDBResult<K, T>(this, query, objList); + } + + @Override + public T get(K key, String[] fields) { + /* + * DynamoDBQuery<K,T> query = new DynamoDBQuery<K,T>(); + * query.setDataStore(this); //query.setKeyRange(key, key); + * //query.setFields(fields); //query.setLimit(1); Result<K,T> result = + * execute(query); boolean hasResult = result.next(); return hasResult ? + * result.get() : null; + */ + return null; + } + + @Override + /** + * Gets the object with the specific key + * @throws IOException + */ + public T get(K key) { + T object = null; + try { + Object rangeKey; + rangeKey = getRangeKeyFromKey(key); + Object hashKey = getHashFromKey(key); + if (hashKey != null) { + DynamoDBMapper mapper = new DynamoDBMapper( + dynamoDBStoreHandler.getDynamoDbClient()); + if (rangeKey != null) + object = mapper.load(persistentClass, hashKey, rangeKey); + else + object = mapper.load(persistentClass, hashKey); + } else + throw new GoraException("Error while retrieving keys from object: " + + key.toString()); + } catch (IllegalArgumentException e) { + LOG.error("Illegal argument detected", e.getMessage()); + throw new IllegalArgumentException(e); + } catch (IllegalAccessException e) { + LOG.error("Illegal access detected", e.getMessage()); + throw new IllegalAccessError(e.getMessage()); + } catch (InvocationTargetException e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } catch (GoraException ge) { + LOG.error(ge.getMessage()); + LOG.error(ge.getStackTrace().toString()); + } + return object; + } + + /** + * Creates a new DynamoDBQuery + */ + public Query<K, T> newQuery() { + Query<K, T> query = new DynamoDBQuery<K, T>(this); + // query.setFields(getFieldsToQuery(null)); + return query; + } + + /** + * Returns a new instance of the key object. + * + * @throws IOException + */ + @Override + public K newKey() { + // TODO Auto-generated method stub + return null; + } + + /** + * Returns a new persistent object + * + * @throws IOException + */ + @Override + public T newPersistent() { + T obj = null; + try { + obj = persistentClass.newInstance(); + } catch (InstantiationException e) { + LOG.error("Error instantiating " + persistentClass.getCanonicalName()); + throw new InstantiationError(e.getMessage()); + } catch (IllegalAccessException e) { + LOG.error("Error instantiating " + persistentClass.getCanonicalName()); + throw new IllegalAccessError(e.getMessage()); + } + return obj; + } + + /** + * Puts an object identified by a key + * + * @throws IOException + */ + @Override + public void put(K key, T obj) { + try { + Object hashKey = getHashKey(key, obj); + Object rangeKey = getRangeKey(key, obj); + if (hashKey != null) { + DynamoDBMapper mapper = new DynamoDBMapper( + dynamoDBStoreHandler.getDynamoDbClient()); + if (rangeKey != null) { + mapper.load(persistentClass, hashKey, rangeKey); + } else { + mapper.load(persistentClass, hashKey); + } + mapper.save(obj); + } else + throw new GoraException("No HashKey found in Key nor in Object."); + } catch (NullPointerException npe) { + LOG.error("Error while putting an item. " + npe.toString()); + throw new NullArgumentException(npe.getMessage()); + } catch (Exception e) { + LOG.error("Error while putting an item. " + obj.toString()); + throw new RuntimeException(e); + } + } + + /** + * Deletes the object using key + * + * @return true for a successful process + * @throws IOException + */ + @Override + public boolean delete(K key) { + try { + T object = null; + Object rangeKey = null, hashKey = null; + DynamoDBMapper mapper = new DynamoDBMapper( + dynamoDBStoreHandler.getDynamoDbClient()); + for (Method met : key.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_RANGE_KEY_METHOD)) { + Object[] params = null; + rangeKey = met.invoke(key, params); + break; + } + } + for (Method met : key.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_HASH_KEY_METHOD)) { + Object[] params = null; + hashKey = met.invoke(key, params); + break; + } + } + if (hashKey == null) + object = (T) mapper.load(persistentClass, key); + if (rangeKey == null) + object = (T) mapper.load(persistentClass, hashKey); + else + object = (T) mapper.load(persistentClass, hashKey, rangeKey); + + if (object == null) + return false; + + // setting key for dynamodbMapper + mapper.delete(object); + return true; + } catch (Exception e) { + LOG.error("Error while deleting value with key " + key.toString()); + LOG.error(e.getMessage()); + return false; + } + } + + /** + * Initialize the data store by reading the credentials, setting the cloud + * provider, setting the client's properties up, setting the end point and + * reading the mapping file + */ + public void initialize(Class<K> keyClass, Class<T> pPersistentClass, + Properties properties) { + super.initialize(keyClass, pPersistentClass, properties); + setWsProvider(WS_PROVIDER); + if (autoCreateSchema) { + createSchema(); + } + } + + /** + * Builds a DynamoDB query from a generic Query object + * + * @param query + * Generic query object + * @return DynamoDBQuery + */ + private DynamoDBQuery<K, T> buildDynamoDBQuery(Query<K, T> query) { + if (getSchemaName() == null) + throw new IllegalStateException("There is not a preferred schema set."); + + DynamoDBQuery<K, T> dynamoDBQuery = new DynamoDBQuery<K, T>(); + dynamoDBQuery.setKeySchema(dynamoDBStoreHandler.getDynamoDbMapping() + .getKeySchema(getSchemaName())); + dynamoDBQuery.setKeyItems(dynamoDBStoreHandler.getDynamoDbMapping().getItems(getSchemaName())); + dynamoDBQuery.setQuery(query); + dynamoDBQuery.setConsistencyReadLevel(dynamoDBStoreHandler + .getConsistencyReads()); + dynamoDBQuery.buildExpression(); + + return dynamoDBQuery; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public void flush() { + LOG.warn("DynamoDBNativeStore puts and gets directly into the datastore"); + } + + @Override + public BeanFactory<K, T> getBeanFactory() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> arg0) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setBeanFactory(BeanFactory<K, T> arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void createSchema() { + LOG.info("Creating Native DynamoDB Schemas."); + if (dynamoDBStoreHandler.getDynamoDbMapping().getTables().isEmpty()) { + throw new IllegalStateException("There are not tables defined."); + } + if (dynamoDBStoreHandler.getPreferredSchema() == null) { + LOG.debug("Creating schemas."); + // read the mapping object + for (String tableName : dynamoDBStoreHandler.getDynamoDbMapping() + .getTables().keySet()) + DynamoDBUtils.executeCreateTableRequest( + dynamoDBStoreHandler.getDynamoDbClient(), tableName, + dynamoDBStoreHandler.getTableKeySchema(tableName), + dynamoDBStoreHandler.getTableAttributes(tableName), + dynamoDBStoreHandler.getTableProvisionedThroughput(tableName)); + LOG.debug("tables created successfully."); + } else { + String tableName = dynamoDBStoreHandler.getPreferredSchema(); + LOG.debug("Creating schema " + tableName); + DynamoDBUtils.executeCreateTableRequest( + dynamoDBStoreHandler.getDynamoDbClient(), tableName, + dynamoDBStoreHandler.getTableKeySchema(tableName), + dynamoDBStoreHandler.getTableAttributes(tableName), + dynamoDBStoreHandler.getTableProvisionedThroughput(tableName)); + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.gora.dynamodb.store.IDynamoDB#setDynamoDBStoreHandler(org.apache + * .gora.dynamodb.store.DynamoDBStore) + */ + @Override + public void setDynamoDBStoreHandler(DynamoDBStore<K, T> dynamoHandler) { + this.dynamoDBStoreHandler = dynamoHandler; + } + + @Override + public void deleteSchema() { + // TODO Auto-generated method stub + + } + + @Override + public String getSchemaName() { + return this.dynamoDBStoreHandler.getSchemaName(); + } + + @Override + public boolean schemaExists() { + return this.dynamoDBStoreHandler.schemaExists(); + } + + private Object getHashKey(K key, T obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + // try to get the hashKey from 'key' + Object hashKey = getHashFromKey(key); + // if the key does not have these attributes then try to get them from the + // object + if (hashKey == null) + hashKey = getHashFromObj(obj); + // if no key has been found, then we try with the key + if (hashKey == null) + hashKey = key; + return hashKey; + } + + /** + * Gets a hash key from a key of type K + * + * @param obj + * Object from which we will get a hash key + * @return + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + private Object getHashFromKey(K obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + Object hashKey = null; + // check if it is a DynamoDBKey + if (obj instanceof DynamoDBKey) { + hashKey = ((DynamoDBKey<?, ?>) obj).getHashKey(); + } else { + // maybe the class has the method defined + for (Method met : obj.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_HASH_KEY_METHOD)) { + Object[] params = null; + hashKey = met.invoke(obj, params); + break; + } + } + } + return hashKey; + } + + /** + * Gets a hash key from an object of type T + * + * @param obj + * Object from which we will get a hash key + * @return + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + private Object getHashFromObj(T obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + Object hashKey = null; + // check if it is a DynamoDBKey + if (obj instanceof DynamoDBKey) { + hashKey = ((DynamoDBKey<?, ?>) obj).getHashKey(); + } else { + // maybe the class has the method defined + for (Method met : obj.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_HASH_KEY_METHOD)) { + Object[] params = null; + hashKey = met.invoke(obj, params); + break; + } + } + } + return hashKey; + } + + private Object getRangeKey(K key, T obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + Object rangeKey = getRangeKeyFromKey(key); + if (rangeKey == null) + rangeKey = getRangeKeyFromObj(obj); + return rangeKey; + } + + /** + * Gets a range key from a key obj. This verifies if it is using a + * {@link DynamoDBKey} + * + * @param obj + * Object from which a range key will be extracted + * @return + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + private Object getRangeKeyFromKey(K obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + Object rangeKey = null; + // check if it is a DynamoDBKey + if (obj instanceof DynamoDBKey) { + rangeKey = ((DynamoDBKey<?, ?>) obj).getRangeKey(); + } else { + // maybe the class has the method defined + for (Method met : obj.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_RANGE_KEY_METHOD)) { + Object[] params = null; + rangeKey = met.invoke(obj, params); + break; + } + } + } + return rangeKey; + } + + /** + * Gets a range key from an object T + * + * @param obj + * Object from which a range key will be extracted + * @return + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws InvocationTargetException + */ + private Object getRangeKeyFromObj(T obj) throws IllegalArgumentException, + IllegalAccessException, InvocationTargetException { + Object rangeKey = null; + // check if it is a DynamoDBKey + if (obj instanceof DynamoDBKey) { + rangeKey = ((DynamoDBKey<?, ?>) obj).getRangeKey(); + } else { + // maybe the class has the method defined + for (Method met : obj.getClass().getDeclaredMethods()) { + if (met.getName().equals(GET_RANGE_KEY_METHOD)) { + Object[] params = null; + rangeKey = met.invoke(obj, params); + break; + } + } + } + return rangeKey; + } + +} \ No newline at end of file