Repository: gora Updated Branches: refs/heads/master a2a484054 -> 170b62aea
Update the policies as per the 4.0.6 aerospike java client Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/170b62ae Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/170b62ae Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/170b62ae Branch: refs/heads/master Commit: 170b62aeae48d2cff474edf0b275b0f2bb3a5f88 Parents: a2a4840 Author: nishadi <ndime...@gmail.com> Authored: Mon Aug 7 19:23:32 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Mon Aug 7 20:07:15 2017 +0530 ---------------------------------------------------------------------- .../store/AerospikeMappingBuilder.java | 270 +++++++++++++------ .../aerospike/store/AerospikePolicyConst.java | 54 ++++ gora-tutorial/conf/gora-aerospike-mapping.xml | 4 +- 3 files changed, 246 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/170b62ae/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java index 5973be8..795959d 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java @@ -19,6 +19,10 @@ package org.apache.gora.aerospike.store; import com.aerospike.client.policy.GenerationPolicy; import com.aerospike.client.policy.Policy; import com.aerospike.client.policy.RecordExistsAction; +import com.aerospike.client.policy.CommitLevel; +import com.aerospike.client.policy.Priority; +import com.aerospike.client.policy.ConsistencyLevel; +import com.aerospike.client.policy.Replica; import com.aerospike.client.policy.WritePolicy; import org.jdom.Document; import org.jdom.Element; @@ -31,7 +35,6 @@ import javax.naming.ConfigurationException; import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.HashMap; @@ -82,32 +85,67 @@ public class AerospikeMappingBuilder { String policy = policyElement.getAttributeValue("name"); if (policy != null) { - if (policy.equals("write")) { + + // Write Policies + if (policy.equals(AerospikePolicyConst.WRITE_POLICY_NAME)) { WritePolicy writePolicy = new WritePolicy(); - if (policyElement.getAttributeValue("gen") != null) { + if (policyElement.getAttributeValue(AerospikePolicyConst.GENERATION_POLICY_NAME) + != null) { writePolicy.generationPolicy = getGenerationPolicyMapping( - policyElement.getAttributeValue("gen").toUpperCase(Locale.getDefault())); + policyElement.getAttributeValue(AerospikePolicyConst.GENERATION_POLICY_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.RECORD_EXISTS_ACTION_NAME) + != null) { + writePolicy.recordExistsAction = getRecordExistsAction(policyElement + .getAttributeValue(AerospikePolicyConst.RECORD_EXISTS_ACTION_NAME)); } - if (policyElement.getAttributeValue("exists") != null) { - writePolicy.recordExistsAction = getRecordExistsAction( - policyElement.getAttributeValue("exists").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue(AerospikePolicyConst.COMMIT_LEVEL_NAME) != null) { + writePolicy.commitLevel = getCommitLevel( + policyElement.getAttributeValue(AerospikePolicyConst.COMMIT_LEVEL_NAME)); } - if (policyElement.getAttributeValue("key") != null) { - writePolicy.sendKey = getKeyUsagePolicy( - policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue(AerospikePolicyConst.DURABLE_DELETE_NAME) != null) { + writePolicy.durableDelete = isDurableDelete( + policyElement.getAttributeValue(AerospikePolicyConst.DURABLE_DELETE_NAME)); } - if (policyElement.getAttributeValue("timeout") != null) { - writePolicy.timeoutDelay = getTimeoutValue(policyElement.getAttributeValue("timeout")); + if (policyElement.getAttributeValue(AerospikePolicyConst.EXPIRATION_NAME) != null) { + writePolicy.expiration = getTimeDuration( + policyElement.getAttributeValue(AerospikePolicyConst.EXPIRATION_NAME)); } aerospikeMapping.setWritePolicy(writePolicy); - } else if (policy.equals("read")) { + } + + // Read Policies + else if (policy.equals(AerospikePolicyConst.READ_POLICY_NAME)) { + Policy readPolicy = new Policy(); - if (policyElement.getAttributeValue("key") != null) { - readPolicy.sendKey = getKeyUsagePolicy( - policyElement.getAttributeValue("key").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue(AerospikePolicyConst.PRIORITY_NAME) != null) { + readPolicy.priority = getPriority( + policyElement.getAttributeValue(AerospikePolicyConst.PRIORITY_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.CONSISTENCY_LEVEL_NAME) + != null) { + readPolicy.consistencyLevel = getConsistencyLevel( + policyElement.getAttributeValue(AerospikePolicyConst.CONSISTENCY_LEVEL_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.REPLICA_POLICY_NAME) != null) { + readPolicy.replica = getReplicaPolicy( + policyElement.getAttributeValue(AerospikePolicyConst.REPLICA_POLICY_NAME)); } - if (policyElement.getAttributeValue("timeout") != null) { - readPolicy.timeoutDelay = getTimeoutValue(policyElement.getAttributeValue("timeout")); + if (policyElement.getAttributeValue(AerospikePolicyConst.SOCKET_TIMEOUT_NAME) != null) { + readPolicy.socketTimeout = getTimeDuration( + policyElement.getAttributeValue(AerospikePolicyConst.SOCKET_TIMEOUT_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.TOTAL_TIMEOUT_NAME) != null) { + readPolicy.totalTimeout = getTimeDuration( + policyElement.getAttributeValue(AerospikePolicyConst.TOTAL_TIMEOUT_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.TIMEOUT_DELAY_NAME) != null) { + readPolicy.timeoutDelay = getTimeDuration( + policyElement.getAttributeValue(AerospikePolicyConst.TIMEOUT_DELAY_NAME)); + } + if (policyElement.getAttributeValue(AerospikePolicyConst.MAX_RETRIES_NAME) != null) { + readPolicy.maxRetries = getMaxRetriesValue( + policyElement.getAttributeValue(AerospikePolicyConst.MAX_RETRIES_NAME)); } aerospikeMapping.setReadPolicy(readPolicy); } @@ -179,27 +217,18 @@ public class AerospikeMappingBuilder { if (genPolicy == null) return GenerationPolicy.NONE; - GenerationPolicy generationPolicy; - switch (genPolicy) { - case "IGNORE": - generationPolicy = GenerationPolicy.NONE; - break; - case "EQ": - generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; - break; - case "GT": - generationPolicy = GenerationPolicy.EXPECT_GEN_GT; - break; - default: { - LOG.warn("Invalid generation policy provided, using the default generation policy"); - generationPolicy = GenerationPolicy.NONE; + for (GenerationPolicy generationPolicyEnum : GenerationPolicy.values()) { + if (genPolicy.equalsIgnoreCase(generationPolicyEnum.toString())) { + return generationPolicyEnum; } } - return generationPolicy; + LOG.warn("Invalid generation policy provided, using the default generation policy."); + return GenerationPolicy.NONE; } /** - * Returns the corresponding record exist action from the user specified exists policy name + * Returns the corresponding record exist action from the user specified exists policy name. + * The default value is UPDATE * * @param existsPolicy exists policy name * @return corresponding record exist action @@ -208,75 +237,156 @@ public class AerospikeMappingBuilder { if (existsPolicy == null) return RecordExistsAction.UPDATE; - RecordExistsAction recordExistsAction; - switch (existsPolicy) { - case "UPDATE": - recordExistsAction = RecordExistsAction.UPDATE; - break; - case "UPDATE_ONLY": - recordExistsAction = RecordExistsAction.UPDATE_ONLY; - break; - case "REPLACE": - recordExistsAction = RecordExistsAction.REPLACE; - break; - case "REPLACE_ONLY": - recordExistsAction = RecordExistsAction.REPLACE_ONLY; - break; - case "CREATE_ONLY": - recordExistsAction = RecordExistsAction.CREATE_ONLY; - break; - default: { - LOG.warn("Invalid record exists action provided, using the default record exists action"); - recordExistsAction = RecordExistsAction.UPDATE; + for (RecordExistsAction recordExistsActionEnum : RecordExistsAction.values()) { + if (existsPolicy.equalsIgnoreCase(recordExistsActionEnum.toString())) { + return recordExistsActionEnum; + } + } + LOG.warn("Invalid record exists action provided, using the default record exists action."); + return RecordExistsAction.UPDATE; + } + + /** + * Returns the corresponding commit level from the user specified commit level name. + * The default value is COMMIT_ALL + * + * @param commitLevel user specified commit level name + * @return corresponding commit level + */ + private CommitLevel getCommitLevel(String commitLevel) { + if (commitLevel == null) + return CommitLevel.COMMIT_ALL; + + for (CommitLevel commitLevelEnum : CommitLevel.values()) { + if (commitLevel.equalsIgnoreCase(commitLevelEnum.toString())) { + return commitLevelEnum; } } - return recordExistsAction; + LOG.warn("Invalid commit level provided, using the default commit level."); + return CommitLevel.COMMIT_ALL; } /** - * Returns the corresponding key usage policy from the user specified key policy name + * Returns the corresponding durable delete boolean from the user specified durable delete value. + * The default value is FALSE * - * @param keyPolicy key policy name - * @return corresponding key usage policy + * @param durableDelete user specified durable delete value + * @return corresponding durable delete boolean value */ - private boolean getKeyUsagePolicy(String keyPolicy) { + private boolean isDurableDelete(String durableDelete) { + if (durableDelete == null) + return false; - if (keyPolicy == null) + if (durableDelete.equalsIgnoreCase("false")) { return false; + } + if (durableDelete.equalsIgnoreCase("true")) { + return true; + } + LOG.warn("Invalid durable delete value provided, using the default durable delete value."); + return false; + } + + /** + * Returns the corresponding priority level from the user specified priority level name. + * The default value is DEFAULT + * + * @param priority user specified priority level name + * @return corresponding priority level + */ + private Priority getPriority(String priority) { + if (priority == null) + return Priority.DEFAULT; - boolean sendKey; - switch (keyPolicy) { - case "DIGEST": - sendKey = false; - break; - case "SEND": - sendKey = true; - break; - default: { - LOG.warn("Invalid key action policy provided, using the default key action policy"); - sendKey = false; + for (Priority priorityEnum : Priority.values()) { + if (priority.equalsIgnoreCase(priorityEnum.toString())) { + return priorityEnum; } } - return sendKey; + LOG.warn("Invalid priority level provided, using the default priority level."); + return Priority.DEFAULT; } /** - * Returns the timeout value from the user specified timeout value + * Returns the corresponding consistency level from the user specified consistency level name. + * The default value is CONSISTENCY_ONE * - * @param timeout user specified timeout value - * @return timeout value + * @param consistencyLevel user specified consistency level name + * @return corresponding consistency level */ - private int getTimeoutValue(String timeout) { + private ConsistencyLevel getConsistencyLevel(String consistencyLevel) { + if (consistencyLevel == null) + return ConsistencyLevel.CONSISTENCY_ONE; - if (timeout == null) { + for (ConsistencyLevel consistencyLevelEnum : ConsistencyLevel.values()) { + if (consistencyLevel.equalsIgnoreCase(consistencyLevelEnum.toString())) { + return consistencyLevelEnum; + } + } + LOG.warn("Invalid consistency level provided, using the default consistency level."); + return ConsistencyLevel.CONSISTENCY_ONE; + } + + /** + * Returns the corresponding replica policy from the user specified replica policy name. + * The default value is SEQUENCE + * + * @param replica user specified replica policy name + * @return corresponding replica policy + */ + private Replica getReplicaPolicy(String replica) { + if (replica == null) + return Replica.SEQUENCE; + + for (Replica replicaEnum : Replica.values()) { + if (replica.equalsIgnoreCase(replicaEnum.toString())) { + return replicaEnum; + } + } + LOG.warn("Invalid replica policy provided, using the default replica policy."); + return Replica.SEQUENCE; + } + + /** + * Returns the corresponding timeDuration value from the user specified timeDuration value. + * The default value is 0 + * + * @param timeDuration user specified timeDuration value + * @return corresponding timeDuration value + */ + private int getTimeDuration(String timeDuration) { + + if (timeDuration == null) { return 0; } - int timeoutInt = 0; + int timeDurationInt = 0; + try { + timeDurationInt = Integer.valueOf(timeDuration); + } catch (NumberFormatException e) { + LOG.warn("Invalid time duration value provided, using the default time duration value"); + } + return timeDurationInt; + } + + /** + * Returns the maximum retires value from the user specified maximum retires value. + * The default value is 2 + * + * @param retiesCount user specified retries count + * @return corresponding maximum retry value + */ + private int getMaxRetriesValue(String retiesCount) { + + // Default value + int maxRetriesInt = 2; + if (retiesCount == null) { + return maxRetriesInt; + } try { - timeoutInt = Integer.valueOf(timeout); + maxRetriesInt = Integer.valueOf(retiesCount); } catch (NumberFormatException e) { LOG.warn("Invalid timeout value provided, using the default timeout value"); } - return timeoutInt; + return maxRetriesInt; } } http://git-wip-us.apache.org/repos/asf/gora/blob/170b62ae/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java new file mode 100644 index 0000000..eae312e --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.aerospike.store; + +/** + * Class containing the configurable policy names + */ +public class AerospikePolicyConst { + + public static final String WRITE_POLICY_NAME = "write"; + + public static final String GENERATION_POLICY_NAME = "gen"; + + public static final String RECORD_EXISTS_ACTION_NAME = "recordExists"; + + public static final String COMMIT_LEVEL_NAME = "commitLevel"; + + public static final String DURABLE_DELETE_NAME = "durableDelete"; + + public static final String EXPIRATION_NAME = "expiration"; + + public static final String READ_POLICY_NAME = "read"; + + public static final String PRIORITY_NAME = "priority"; + + public static final String CONSISTENCY_LEVEL_NAME = "consistencyLevel"; + + public static final String REPLICA_POLICY_NAME = "replica"; + + public static final String SOCKET_TIMEOUT_NAME = "socketTimeout"; + + public static final String TOTAL_TIMEOUT_NAME = "totalTimeout"; + + public static final String TIMEOUT_DELAY_NAME = "timeoutDelay"; + + public static final String MAX_RETRIES_NAME = "maxRetries"; + +} http://git-wip-us.apache.org/repos/asf/gora/blob/170b62ae/gora-tutorial/conf/gora-aerospike-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-tutorial/conf/gora-aerospike-mapping.xml b/gora-tutorial/conf/gora-aerospike-mapping.xml index aca18ca..5506880 100644 --- a/gora-tutorial/conf/gora-aerospike-mapping.xml +++ b/gora-tutorial/conf/gora-aerospike-mapping.xml @@ -21,8 +21,8 @@ --> <gora-otd> - <policy name="write" gen="IGNORE" exists="CREATE" key="DIGEST" retry="ONCE" timeout="1000"/> - <policy name="read" key="DIGEST" timeout="1000"/> + <policy name="write" gen="NONE" recordExists="UPDATE" commitLevel="COMMIT_ALL" durableDelete="false"/> + <policy name="read" priority="DEFAULT" consistencyLevel="CONSISTENCY_ONE" replica="SEQUENCE" maxRetries="2"/> <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" set="AccessLog" namespace = "test"> <field name="url" bin="url"/>