This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new 4bac876c GORA-650 Add ArangoDB datastore new 8b5cea77 Merge pull request #291 from chaminda-neluka/GORA-650 4bac876c is described below commit 4bac876c854129181ca8a621df8bfa80f95ac899 Author: chaminda-neluka <chaminda.g...@gmail.com> AuthorDate: Wed Aug 23 01:01:54 2023 +0200 GORA-650 Add ArangoDB datastore --- gora-arangodb/pom.xml | 145 ++++ .../org/apache/gora/arangodb/package-info.java | 19 + .../apache/gora/arangodb/query/ArangoDBQuery.java | 105 +++ .../apache/gora/arangodb/query/ArangoDBResult.java | 95 +++ .../apache/gora/arangodb/query/package-info.java | 19 + .../gora/arangodb/store/ArangoDBMapping.java | 116 ++++ .../arangodb/store/ArangoDBMappingBuilder.java | 119 ++++ .../apache/gora/arangodb/store/ArangoDBStore.java | 742 +++++++++++++++++++++ .../arangodb/store/ArangoDBStoreParameters.java | 119 ++++ .../apache/gora/arangodb/store/package-info.java | 19 + .../apache/gora/arangodb/ArangoDBTestDriver.java | 85 +++ .../org/apache/gora/arangodb/package-info.java | 19 + .../arangodb/store/ArangoDBGoraDataStoreTest.java | 97 +++ .../store/ArangoDBStartupWaitStrategy.java | 33 + .../apache/gora/arangodb/store/package-info.java | 19 + .../src/test/resources/gora-arangodb-mapping.xml | 46 ++ gora-arangodb/src/test/resources/gora.properties | 23 + pom.xml | 11 + 18 files changed, 1831 insertions(+) diff --git a/gora-arangodb/pom.xml b/gora-arangodb/pom.xml new file mode 100644 index 00000000..14445e24 --- /dev/null +++ b/gora-arangodb/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + <artifactId>gora-arangodb</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: ArangoDB</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.arangodb*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + <!--targetPath>${project.basedir}/target/classes/</targetPath --> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- Logging Dependencies --> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + + <!-- XML Dependencies --> + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom</artifactId> + <scope>compile</scope> + </dependency> + + + <!-- ArangoDB Dependencies --> + <dependency> + <groupId>com.arangodb</groupId> + <artifactId>arangodb-java-driver</artifactId> + </dependency> + + <!-- Hadoop Dependencies --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + </dependencies> + +</project> diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/package-info.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/package-info.java new file mode 100644 index 00000000..a4c10776 --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb; \ No newline at end of file diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBQuery.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBQuery.java new file mode 100644 index 00000000..e8b494c0 --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBQuery.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.query; + +import org.apache.gora.arangodb.store.ArangoDBMapping; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +import java.util.HashMap; +import java.util.Map; + +/** + * ArangoDB specific implementation of the {@link org.apache.gora.query.Query} interface. + */ +public class ArangoDBQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + private String arangoDBQuery; + private Map<String, Object> params; + + public ArangoDBQuery() { + super(null); + } + + public ArangoDBQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + + public String populateArangoDBQuery(final ArangoDBMapping arangoDBMapping, + final boolean isSelect) { + params = new HashMap<String, Object>(); + StringBuffer dbQuery = new StringBuffer(); + if ((this.getStartKey() != null) && (this.getEndKey() != null) + && this.getStartKey().equals(this.getEndKey())) { + dbQuery.append("FOR t IN @collection FILTER t._key == @key " + .replace("@collection", arangoDBMapping.getDocumentClass())); + params.put("key", encodeArangoDBKey(this.getStartKey())); + } else if (this.getStartKey() != null && this.getEndKey() != null) { + dbQuery.append("FOR t IN @collection FILTER t._key >= @lower && t._key <= @upper " + .replace("@collection", arangoDBMapping.getDocumentClass())); + params.put("lower", encodeArangoDBKey(this.getStartKey())); + params.put("upper", encodeArangoDBKey(this.getEndKey())); + } else if (this.getStartKey() != null) { + dbQuery.append("FOR t IN @collection FILTER t._key >= @lower " + .replace("@collection", arangoDBMapping.getDocumentClass())); + params.put("lower", encodeArangoDBKey((this.getStartKey()))); + } else if (this.getEndKey() != null) { + dbQuery.append("FOR t IN @collection FILTER t._key <= @upper " + .replace("@collection", arangoDBMapping.getDocumentClass())); + params.put("upper", encodeArangoDBKey(this.getEndKey())); + } else { + dbQuery.append("FOR t IN @collection " + .replace("@collection", arangoDBMapping.getDocumentClass())); + } + + if (this.getLimit() != -1) { + dbQuery.append("Limit @count "); + params.put("count", this.getLimit()); + } + + if (isSelect) { + dbQuery.append("RETURN t"); + } else { + dbQuery.append("REMOVE t IN @collection".replace("@collection", arangoDBMapping.getDocumentClass())); + } + + arangoDBQuery = dbQuery.toString(); + return arangoDBQuery; + } + + public Map<String, Object> getParams() { + return params; + } + + public String getArangoDBQuery() { + return arangoDBQuery; + } + + public K encodeArangoDBKey(final K key) { + if (key == null) { + return null; + } else if (!(key instanceof String)) { + return key; + } + return (K) key.toString().replace("/", "%2F") + .replace("&", "%26"); + } + +} diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBResult.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBResult.java new file mode 100644 index 00000000..c8b3cc2e --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/ArangoDBResult.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.query; + +import java.io.IOException; + +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoIterator; +import com.arangodb.entity.BaseDocument; +import org.apache.gora.arangodb.store.ArangoDBStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +/** + * ArangoDB specific implementation of the {@link org.apache.gora.query.Result} interface. + * + */ +public class ArangoDBResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + private int size; + private ArangoCursor<BaseDocument> cursor; + private ArangoIterator<BaseDocument> resultSetIterator; + + public ArangoDBResult(DataStore<K, T> dataStore, + Query<K, T> query, + ArangoCursor<BaseDocument> cursor) { + super(dataStore, query); + this.cursor = cursor; + this.resultSetIterator = cursor.iterator(); + this.size = cursor.getStats().getFullCount().intValue(); + } + + public ArangoDBResult(DataStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + public ArangoDBStore<K, T> getDataStore() { + return (ArangoDBStore<K, T>) super.getDataStore(); + } + + @Override + public float getProgress() throws IOException { + if (cursor == null) { + return 0; + } else if (size == 0) { + return 1; + } else { + return offset / (float) size; + } + } + + @Override + public void close() throws IOException { + cursor.close(); + } + + @Override + protected boolean nextInner() throws IOException { + if (!resultSetIterator.hasNext()) { + return false; + } + + BaseDocument obj = resultSetIterator.next(); + key = (K) obj.getKey(); + persistent = ((ArangoDBStore<K, T>) getDataStore()) + .convertArangoDBDocToAvroBean(obj, getQuery().getFields()); + return persistent != null; + } + + @Override + public int size() { + int totalSize = size; + int intLimit = (int) this.limit; + return intLimit > 0 && totalSize > intLimit ? intLimit : totalSize; + } + +} diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/package-info.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/package-info.java new file mode 100644 index 00000000..9a807a4e --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/query/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.query; \ No newline at end of file diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMapping.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMapping.java new file mode 100644 index 00000000..7cf9ad01 --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMapping.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Maintains mapping between AVRO data bean and ArangoDB document. + */ +public class ArangoDBMapping { + + public static final Logger log = LoggerFactory.getLogger(ArangoDBMapping.class); + + private String documentClass; + private HashMap<String, String> classToDocument = new HashMap<>(); + private HashMap<String, String> documentToClass = new HashMap<>(); + private HashMap<String, DocumentFieldType> documentFields = new HashMap<>(); + + public String getDocumentClass() { + return documentClass; + } + + public void setDocumentClass(String documentClass) { + this.documentClass = documentClass; + } + + private void registerDocumentField(String name, DocumentFieldType type) { + if (documentFields.containsKey(name) && (documentFields.get(name) != type)) + throw new IllegalStateException("The field '" + name + "' is already " + + "registered with a different type."); + documentFields.put(name, type); + } + + public void registerClassField(String classFieldName, + String docFieldName, String fieldType) { + try { + registerDocumentField(docFieldName, + DocumentFieldType.valueOf(fieldType.toUpperCase(Locale.getDefault()))); + } catch (final IllegalArgumentException e) { + throw new IllegalStateException("Declared '" + fieldType + + "' for class field '" + classFieldName + + "' is not supported by ArangoDBMapping"); + } + + if (classToDocument.containsKey(classFieldName)) { + if (!classToDocument.get(classFieldName).equals(docFieldName)) { + throw new IllegalStateException("The class field '" + classFieldName + + "' is already registered in the mapping" + + " with the document field '" + + classToDocument.get(classFieldName) + + " which differs from the new one '" + docFieldName + "'."); + } + } else { + classToDocument.put(classFieldName, docFieldName); + documentToClass.put(docFieldName, classFieldName); + } + } + + public String[] getDocumentFields() { + return documentToClass.keySet().toArray(new String[documentToClass.keySet().size()]); + } + + public String getDocumentField(String field) { + return classToDocument.get(field); + } + + protected DocumentFieldType getDocumentFieldType(String field) { + return documentFields.get(field); + } + + public static enum DocumentFieldType { + + BOOLEAN("boolean"), + INTEGER("integer"), + LONG("long"), + FLOAT("float"), + SHORT("short"), + DOUBLE("double"), + STRING("string"), + DOCUMENT("document"), + LIST("list"), + MAP("map"); + + private final String stringValue; + + DocumentFieldType(final String s) { + stringValue = s; + } + + public String toString() { + return stringValue; + } + + } + +} diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMappingBuilder.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMappingBuilder.java new file mode 100644 index 00000000..59a35f9c --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBMappingBuilder.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import org.apache.gora.persistency.impl.PersistentBase; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility builder for create ArangoDB mapping from gora-arango-mapping.xml. + */ +public class ArangoDBMappingBuilder<K, T extends PersistentBase> { + + public static final String ATT_NAME = "name"; + public static final String ATT_TYPE = "type"; + public static final String TAG_CLASS = "class"; + public static final String ATT_KEYCLASS = "keyClass"; + public static final String ATT_DOCUMENT = "document"; + public static final String TAG_FIELD = "field"; + public static final String ATT_FIELD = "docfield"; + public static final Logger log = LoggerFactory.getLogger(ArangoDBMappingBuilder.class); + + private final ArangoDBStore<K, T> dataStore; + + private ArangoDBMapping mapping; + + public ArangoDBMappingBuilder(final ArangoDBStore<K, T> store) { + this.dataStore = store; + this.mapping = new ArangoDBMapping(); + } + + public ArangoDBMapping build() { + if (mapping.getDocumentClass() == null) + throw new IllegalStateException("Document Class is not specified."); + return mapping; + } + + protected ArangoDBMappingBuilder fromFile(String uri) throws IOException { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + InputStream is = getClass().getResourceAsStream(uri); + if (is == null) { + String msg = "Unable to load the mapping from classpath resource '" + uri + + "' Re-trying local from local file system location."; + log.warn(msg); + is = new FileInputStream(uri); + } + Document doc = saxBuilder.build(is); + Element root = doc.getRootElement(); + List<Element> classElements = root.getChildren(TAG_CLASS); + for (Element classElement : classElements) { + final Class<T> persistentClass = dataStore.getPersistentClass(); + final Class<K> keyClass = dataStore.getKeyClass(); + if (matchesKeyClassWithMapping(keyClass, classElement) + && matchesPersistentClassWithMapping(persistentClass, classElement)) { + loadPersistentClass(classElement, persistentClass); + break; + } + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); + } + return this; + } + + private boolean matchesPersistentClassWithMapping(final Class<T> persistentClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_NAME).equals(persistentClass.getName()); + } + + private boolean matchesKeyClassWithMapping(final Class<K> keyClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_KEYCLASS).equals(keyClass.getName()); + } + + private void loadPersistentClass(Element classElement, + Class<T> pPersistentClass) { + + String docClassFromMapping = classElement.getAttributeValue(ATT_DOCUMENT); + String resolvedDocClass = dataStore.getSchemaName(docClassFromMapping, + pPersistentClass); + mapping.setDocumentClass(resolvedDocClass); + + List<Element> fields = classElement.getChildren(TAG_FIELD); + for (Element field : fields) { + mapping.registerClassField(field.getAttributeValue(ATT_NAME), + field.getAttributeValue(ATT_FIELD), + field.getAttributeValue(ATT_TYPE)); + } + } + +} \ No newline at end of file diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStore.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStore.java new file mode 100644 index 00000000..40216280 --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStore.java @@ -0,0 +1,742 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.List; +import java.util.LinkedHashMap; +import java.util.ArrayList; + +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDB; +import com.arangodb.entity.BaseDocument; +import com.arangodb.model.AqlQueryOptions; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; +import org.apache.gora.arangodb.query.ArangoDBQuery; +import org.apache.gora.arangodb.query.ArangoDBResult; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; + +/** + * {@inheritDoc} + * {@link ArangoDBStore} is the primary class + * responsible for facilitating GORA CRUD operations on ArangoDB documents. + */ +public class ArangoDBStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + public static final String DEFAULT_MAPPING_FILE = "/gora-arangodb-mapping.xml"; + private ArangoDBStoreParameters arangoDbStoreParams; + private ArangoDBMapping arangoDBMapping; + private ArangoDB arangoDB; + + /** + * {@inheritDoc} + * Initialize the ArangoDB dataStore by {@link Properties} parameters. + * + * @param keyClass key class type for dataStore. + * @param persistentClass persistent class type for dataStore. + * @param properties ArangoDB dataStore properties EG:- ArangoDB client credentials. + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + super.initialize(keyClass, persistentClass, properties); + try { + arangoDbStoreParams = ArangoDBStoreParameters.load(properties, getConf()); + arangoDB = new ArangoDB.Builder() + .host(arangoDbStoreParams.getServerHost(), + Integer.valueOf(arangoDbStoreParams.getServerPort())) + .user(arangoDbStoreParams.getUserName()) + .password(arangoDbStoreParams.getUserPassword()) + .maxConnections(Integer.valueOf(arangoDbStoreParams.getConnectionPoolSize())) + .build(); + if (!arangoDB.db(arangoDbStoreParams.getDatabaseName()).exists()) + arangoDB.createDatabase(arangoDbStoreParams.getDatabaseName()); + + ArangoDBMappingBuilder<K, T> builder = new ArangoDBMappingBuilder<>(this); + arangoDBMapping = builder.fromFile(arangoDbStoreParams.getMappingFile()).build(); + if (!schemaExists()) { + createSchema(); + } + } catch (Exception e) { + LOG.error("Error while initializing ArangoDB dataStore: {}", + new Object[]{e.getMessage()}); + throw new RuntimeException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String getSchemaName(final String mappingSchemaName, + final Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + /** + * {@inheritDoc} + */ + @Override + public String getSchemaName() { + return arangoDBMapping.getDocumentClass(); + } + + /** + * {@inheritDoc} + * Create a new class of ArangoDB documents if necessary. Enforce specified schema over the document class. + */ + @Override + public void createSchema() throws GoraException { + if (schemaExists()) { + return; + } + try { + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .createCollection(arangoDBMapping.getDocumentClass()); + + } catch (Exception e) { + throw new GoraException(e); + } + } + + /** + * {@inheritDoc} + * Deletes enforced schema over ArangoDB Document class. + */ + @Override + public void deleteSchema() throws GoraException { + try { + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).drop(); + } catch (Exception e) { + throw new GoraException(e); + } + } + + /** + * {@inheritDoc} + * Check whether there exist a schema enforced over ArangoDB document class. + */ + @Override + public boolean schemaExists() throws GoraException { + try { + return arangoDB + .db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).exists(); + } catch (Exception e) { + throw new GoraException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public T get(K key, String[] fields) throws GoraException { + try { + key = encodeArangoDBKey(key); + boolean isExists = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).documentExists(key.toString()); + if (isExists) { + String[] dbFields = getFieldsToQuery(fields); + BaseDocument document = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).getDocument(key.toString(), BaseDocument.class); + return convertArangoDBDocToAvroBean(document, dbFields); + } else { + return null; + } + } catch (Exception e) { + throw new GoraException(e); + } + } + + public T convertArangoDBDocToAvroBean(final BaseDocument obj, final String[] fields) throws GoraException { + T persistent = newPersistent(); + String[] dbFields = getFieldsToQuery(fields); + for (String f : dbFields) { + String docf = arangoDBMapping.getDocumentField(f); + if (docf == null || !obj.getProperties().containsKey(docf)) + continue; + + ArangoDBMapping.DocumentFieldType storeType = arangoDBMapping.getDocumentFieldType(docf); + Schema.Field field = fieldMap.get(f); + Schema fieldSchema = field.schema(); + + LOG.debug("Load from ODocument, field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{field.name(), fieldSchema.getType(), docf, storeType}); + Object result = convertDocFieldToAvroField(fieldSchema, storeType, field, docf, obj); + persistent.put(field.pos(), result); + } + persistent.clearDirty(); + return persistent; + } + + private Object convertDocFieldToAvroField(final Schema fieldSchema, + final ArangoDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final BaseDocument obj) throws GoraException { + Object result = null; + switch (fieldSchema.getType()) { + case MAP: + result = convertDocFieldToAvroMap(docf, fieldSchema, obj, field, storeType); + break; + case ARRAY: + result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType); + break; + case RECORD: + LinkedHashMap record = (LinkedHashMap) obj.getAttribute(docf); + if (record == null) { + result = null; + break; + } + result = convertAvroBeanToArangoDBDoc(fieldSchema, convertToArangoDBDoc(record)); + break; + case BOOLEAN: + result = Boolean.valueOf(obj.getAttribute(docf).toString()); + break; + case DOUBLE: + result = Double.valueOf(obj.getAttribute(docf).toString()); + break; + case FLOAT: + result = Float.valueOf(obj.getAttribute(docf).toString()); + break; + case INT: + result = Integer.valueOf(obj.getAttribute(docf).toString()); + break; + case LONG: + result = Long.valueOf(obj.getAttribute(docf).toString()); + break; + case STRING: + result = new Utf8(obj.getAttribute(docf).toString()); + ; + break; + case ENUM: + result = AvroUtils.getEnumValue(fieldSchema, obj.getAttribute(docf).toString()); + break; + case BYTES: + case FIXED: + if (!obj.getProperties().containsKey(docf)) { + result = null; + break; + } + result = ByteBuffer.wrap(Base64 + .getDecoder() + .decode(obj.getAttribute(docf).toString())); + break; + case NULL: + result = null; + break; + case UNION: + result = convertDocFieldToAvroUnion(fieldSchema, storeType, field, docf, obj); + break; + default: + LOG.warn("Unable to read {}", docf); + break; + } + return result; + } + + private Object convertDocFieldToAvroUnion(final Schema fieldSchema, + final ArangoDBMapping.DocumentFieldType storeType, + final Schema.Field field, + final String docf, + final BaseDocument doc) throws GoraException { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Load from ODocument (UNION), schemaType:{}, docField:{}, storeType:{}", + new Object[]{innerSchema.getType(), docf, storeType}); + + result = convertDocFieldToAvroField(innerSchema, storeType, field, docf, doc); + } else { + throw new GoraException("ArangoDBStore only supports Union of two types field."); + } + return result; + } + + private BaseDocument convertToArangoDBDoc(final LinkedHashMap<String, Object> linkedHashMap) { + BaseDocument document = new BaseDocument(); + for (String key : linkedHashMap.keySet()) { + document.addAttribute(key, linkedHashMap.get(key)); + } + return document; + } + + private Object convertAvroBeanToArangoDBDoc(final Schema fieldSchema, + final BaseDocument doc) throws GoraException { + Object result; + Class<?> clazz = null; + try { + clazz = ClassLoadingUtils.loadClass(fieldSchema.getFullName()); + } catch (Exception e) { + throw new GoraException(e); + } + PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); + for (Schema.Field recField : fieldSchema.getFields()) { + Schema innerSchema = recField.schema(); + ArangoDBMapping.DocumentFieldType innerStoreType = arangoDBMapping + .getDocumentFieldType(recField.name()); + String innerDocField = arangoDBMapping.getDocumentField(recField.name()) != null ? arangoDBMapping + .getDocumentField(recField.name()) : recField.name(); + LOG.debug("Load from ODocument (RECORD), field:{}, schemaType:{}, docField:{}, storeType:{}", + new Object[]{recField.name(), innerSchema.getType(), innerDocField, + innerStoreType}); + record.put(recField.pos(), + convertDocFieldToAvroField(innerSchema, innerStoreType, recField, innerDocField, + doc)); + } + result = record; + return result; + } + + private Object convertDocFieldToAvroList(final String docf, + final Schema fieldSchema, + final BaseDocument doc, + final Schema.Field f, + final ArangoDBMapping.DocumentFieldType storeType) throws GoraException { + + if (storeType == ArangoDBMapping.DocumentFieldType.LIST || storeType == null) { + List<Object> list = (List<Object>) doc.getAttribute(docf); + List<Object> rlist = new ArrayList<>(); + if (list == null) { + return new DirtyListWrapper(rlist); + } + + for (Object item : list) { + BaseDocument innerDoc = new BaseDocument(); + innerDoc.addAttribute("item", item); + Object o = convertDocFieldToAvroField(fieldSchema.getElementType(), storeType, f, + "item", innerDoc); + rlist.add(o); + } + return new DirtyListWrapper<>(rlist); + } + return null; + } + + private Object convertDocFieldToAvroMap(final String docf, final Schema fieldSchema, + final BaseDocument doc, final Schema.Field f, + final ArangoDBMapping.DocumentFieldType storeType) throws GoraException { + if (storeType == ArangoDBMapping.DocumentFieldType.MAP) { + Map<String, Object> map = (Map<String, Object>) doc.getAttribute(docf); + Map<Utf8, Object> rmap = new HashMap<>(); + if (map == null) { + return new DirtyMapWrapper(rmap); + } + + for (Map.Entry entry : map.entrySet()) { + String mapKey = entry.getKey().toString(); + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + decorateOTrackedMapToODoc(map)); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } else { + LinkedHashMap<String, Object> innerDoc = (LinkedHashMap) doc.getAttribute(docf); + Map<Utf8, Object> rmap = new HashMap<>(); + if (innerDoc == null) { + return new DirtyMapWrapper(rmap); + } + + for (String fieldName : innerDoc.keySet()) { + String mapKey = fieldName; + Object o = convertDocFieldToAvroField(fieldSchema.getValueType(), storeType, f, mapKey, + convertToArangoDBDoc(innerDoc)); + rmap.put(new Utf8(mapKey), o); + } + return new DirtyMapWrapper<>(rmap); + } + } + + private BaseDocument decorateOTrackedMapToODoc(Map<String, Object> map) { + BaseDocument doc = new BaseDocument(); + for (Map.Entry entry : map.entrySet()) { + doc.addAttribute(entry.getKey().toString(), entry.getValue()); + } + return doc; + } + + + /** + * {@inheritDoc} + */ + @Override + public void put(K key, T val) throws GoraException { + if (val.isDirty()) { + try { + key = encodeArangoDBKey(key); + boolean isExists = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).documentExists(key.toString()); + if (!isExists) { + BaseDocument document = convertAvroBeanToArangoDocument(key, val); + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()) + .insertDocument(document); + } else { + BaseDocument document = convertAvroBeanToArangoDocument(key, val); + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()) + .updateDocument(key.toString(), document); + } + } catch (Exception e) { + throw new GoraException(e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{val}); + } + } + } + + private BaseDocument convertAvroBeanToArangoDocument(final K key, final T persistent) { + BaseDocument result = new BaseDocument(); + for (Schema.Field f : persistent.getSchema().getFields()) { + if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { + String docf = arangoDBMapping.getDocumentField(f.name()); + Object value = persistent.get(f.pos()); + ArangoDBMapping.DocumentFieldType storeType = arangoDBMapping.getDocumentFieldType(docf); + LOG.debug("Transform value to ODocument, docField:{}, schemaType:{}, storeType:{}", + new Object[]{docf, f.schema().getType(), storeType}); + Object o = convertAvroFieldToArangoField(docf, f.schema(), f.schema().getType(), + storeType, value); + result.addAttribute(docf, o); + } + } + result.setKey(key.toString()); + return result; + } + + private Object convertAvroFieldToArangoField(final String docf, final Schema fieldSchema, + final Schema.Type fieldType, + final ArangoDBMapping.DocumentFieldType storeType, + final Object value) { + Object result = null; + switch (fieldType) { + case MAP: + if (storeType != null && !(storeType == ArangoDBMapping.DocumentFieldType.MAP || + storeType == ArangoDBMapping.DocumentFieldType.DOCUMENT)) { + throw new IllegalStateException( + "Field " + fieldSchema.getName() + + ": to store a AVRO 'map', target ArangoDB mapping have to be of type 'Map'" + + "| 'Document'"); + } + Schema valueSchema = fieldSchema.getValueType(); + result = convertAvroMapToDocField(docf, (Map<CharSequence, ?>) value, valueSchema, + valueSchema.getType(), storeType); + break; + case ARRAY: + if (storeType != null && !(storeType == ArangoDBMapping.DocumentFieldType.LIST)) { + throw new IllegalStateException("Field " + fieldSchema.getName() + + ": To store a AVRO 'array', target ArangoDB mapping have to be of type 'List'"); + } + Schema elementSchema = fieldSchema.getElementType(); + result = convertAvroListToDocField(docf, (List<?>) value, elementSchema, + elementSchema.getType(), storeType); + break; + case BYTES: + case FIXED: + if (value != null) { + result = Base64.getEncoder() + .encodeToString(((ByteBuffer) value).array()); + } + break; + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + result = value; + break; + case STRING: + if (value != null) { + result = value.toString(); + } + break; + case ENUM: + if (value != null) + result = value.toString(); + break; + case RECORD: + if (value == null) + break; + result = convertAvroBeanToArangoDBDocField(docf, fieldSchema, value); + break; + case UNION: + result = convertAvroUnionToArangoDBField(docf, fieldSchema, storeType, value); + break; + default: + LOG.error("Unknown field type: {}", fieldSchema.getType()); + break; + } + return result; + } + + private Object convertAvroMapToDocField(final String docf, + final Map<CharSequence, ?> value, final Schema fieldSchema, + final Schema.Type fieldType, + final ArangoDBMapping.DocumentFieldType storeType) { + if (storeType == ArangoDBMapping.DocumentFieldType.MAP) { + HashMap map = new HashMap<String, Object>(); + if (value == null) + return map; + + for (Map.Entry<CharSequence, ?> e : value.entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + + ArangoDBMapping.DocumentFieldType fieldStoreType = arangoDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToArangoField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + map.put(mapKey, result); + } + return map; + } else { + BaseDocument doc = new BaseDocument(); + if (value == null) + return doc; + for (Map.Entry<CharSequence, ?> e : value.entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + + ArangoDBMapping.DocumentFieldType fieldStoreType = arangoDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToArangoField(docf, fieldSchema, fieldType, fieldStoreType, + mapValue); + doc.addAttribute(mapKey, result); + } + return doc; + } + } + + private Object convertAvroListToDocField(final String docf, final Collection<?> array, + final Schema fieldSchema, final Schema.Type fieldType, + final ArangoDBMapping.DocumentFieldType storeType) { + if (storeType == ArangoDBMapping.DocumentFieldType.LIST) { + ArrayList list; + list = new ArrayList<Object>(); + if (array == null) + return list; + for (Object item : array) { + ArangoDBMapping.DocumentFieldType fieldStoreType = arangoDBMapping.getDocumentFieldType(docf); + Object result = convertAvroFieldToArangoField(docf, fieldSchema, fieldType, fieldStoreType, item); + list.add(result); + } + return list; + } + return null; + } + + private BaseDocument convertAvroBeanToArangoDBDocField(final String docf, + final Schema fieldSchema, + final Object value) { + BaseDocument record = new BaseDocument(); + for (Schema.Field member : fieldSchema.getFields()) { + Object innerValue = ((PersistentBase) value).get(member.pos()); + String innerDoc = arangoDBMapping.getDocumentField(member.name()); + Schema.Type innerType = member.schema().getType(); + ArangoDBMapping.DocumentFieldType innerStoreType = arangoDBMapping.getDocumentFieldType(innerDoc); + LOG.debug("Transform value to BaseDocument , docField:{}, schemaType:{}, storeType:{}", + new Object[]{member.name(), member.schema().getType(), + innerStoreType}); + Object fieldValue = convertAvroFieldToArangoField(docf, member.schema() + , innerType, innerStoreType, innerValue); + record.addAttribute(member.name(), fieldValue); + } + return record; + } + + private Object convertAvroUnionToArangoDBField(final String docf, final Schema fieldSchema, + final ArangoDBMapping.DocumentFieldType storeType, + final Object value) { + Object result; + Schema.Type type0 = fieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = fieldSchema.getTypes().get(1).getType(); + + if (!type0.equals(type1) + && (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL))) { + Schema innerSchema = null; + if (type0.equals(Schema.Type.NULL)) { + innerSchema = fieldSchema.getTypes().get(1); + } else { + innerSchema = fieldSchema.getTypes().get(0); + } + + LOG.debug("Transform value to ODocument (UNION), type:{}, storeType:{}", + new Object[]{innerSchema.getType(), type1, storeType}); + + result = convertAvroFieldToArangoField(docf, innerSchema, innerSchema.getType(), storeType, value); + } else { + throw new IllegalStateException("ArangoDBStore only supports Union of two types field."); + } + return result; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean delete(K key) throws GoraException { + try { + key = encodeArangoDBKey(key); + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()) + .deleteDocument(key.toString()); + return true; + } catch (Exception e) { + throw new GoraException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long deleteByQuery(Query<K, T> query) throws GoraException { + if (query.getFields() == null || (query.getFields().length == getFields().length)) { + ArangoDBQuery dataStoreQuery = (ArangoDBQuery) query; + try { + dataStoreQuery.populateArangoDBQuery(arangoDBMapping, true); + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().fullCount(true); + ArangoCursor<BaseDocument> cursor = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .query(dataStoreQuery.getArangoDBQuery(), dataStoreQuery.getParams(), aqlQueryOptions, BaseDocument.class); + long size = new ArangoDBResult<K, T>(this, query, cursor).size(); + dataStoreQuery.populateArangoDBQuery(arangoDBMapping, false); + arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .query(dataStoreQuery.getArangoDBQuery(), dataStoreQuery.getParams(), aqlQueryOptions, BaseDocument.class); + return size; + } catch (Exception e) { + throw new GoraException(e); + } + } else { + // TODO: not supported for case where query.getFields().length != getFields().length + // TODO: BaseDocument.removeAttribute or similar is not available at the moment + return -1; + } + } + + /** + * {@inheritDoc} + */ + @Override + public Result<K, T> execute(Query<K, T> query) throws GoraException { + ArangoDBQuery dataStoreQuery; + if (query instanceof ArangoDBQuery) { + dataStoreQuery = ((ArangoDBQuery) query); + } else { + dataStoreQuery = (ArangoDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery(); + } + dataStoreQuery.populateArangoDBQuery(arangoDBMapping, true); + try { + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().fullCount(true); + ArangoCursor<BaseDocument> cursor = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .query(dataStoreQuery.getArangoDBQuery(), dataStoreQuery.getParams(), aqlQueryOptions, BaseDocument.class); + return new ArangoDBResult<K, T>(this, query, cursor); + } catch (Exception e) { + throw new GoraException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Query<K, T> newQuery() { + return new ArangoDBQuery<>(this); + } + + /** + * {@inheritDoc} + */ + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(this.getConf()); + partitions.add(partitionQuery); + return partitions; + } + + /** + * {@inheritDoc} + * Flushes locally cached to content in memory to remote ArangoDB server. + */ + @Override + public void flush() throws GoraException { + } + + /** + * {@inheritDoc} + * Releases resources which have been used dataStore. + */ + @Override + public void close() { + try { + flush(); + } catch (Exception ex) { + LOG.error("Error occurred while flushing data to ArangoDB : ", ex); + } + } + + @Override + public boolean exists(K key) throws GoraException { + boolean isExists = arangoDB.db(arangoDbStoreParams.getDatabaseName()) + .collection(arangoDBMapping.getDocumentClass()).documentExists(key.toString()); + return isExists; + } + + public K encodeArangoDBKey(final K key) { + if (key == null) { + return null; + } else if (!(key instanceof String)) { + return key; + } + return (K) key.toString().replace("/", "%2F") + .replace("&", "%26"); + } + +} diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStoreParameters.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStoreParameters.java new file mode 100644 index 00000000..d68947eb --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/ArangoDBStoreParameters.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +/** + * Maintains ArangoDB client related properties parsed from gora.properties. + */ +public class ArangoDBStoreParameters { + + public static final String ARANGO_DB_MAPPING_FILE = "gora.arangodb.mapping.file"; + public static final String ARANGO_DB_SERVER_HOST = "gora.arangodb.server.host"; + public static final String ARANGO_DB_SERVER_PORT = "gora.arangodb.server.port"; + public static final String ARANGO_DB_USER_USERNAME = "gora.arangodb.user.username"; + public static final String ARANGO_DB_USER_PASSWORD = "gora.arangodb.user.password"; + public static final String ARANGO_DB_DB_NAME = "gora.arangodb.database.name"; + public static final String ARANGO_DB_CONNECTION_POOL_SIZE = "gora.arangodb.con.pool.size"; + public static final String ARANGO_PROP_OVERRIDING = "gora.arangodb.override.hadoop.configuration"; + + public static final String DEFAULT_ARANGO_DB_SERVER_HOST = "localhost"; + public static final String DEFAULT_ARANGO_DB_SERVER_PORT = "8529"; + public static final String DEFAULT_ARANGO_DB_USER_USERNAME = "root"; + public static final String DEFAULT_ARANGO_DB_USER_PASSWORD = "root"; + + private String mappingFile; + private String serverHost; + private String serverPort; + private String userName; + private String userPassword; + private String databaseName; + private String connPoolSize; + + public String getMappingFile() { + return this.mappingFile; + } + + public String getServerHost() { + return this.serverHost; + } + + public String getServerPort() { + return this.serverPort; + } + + public String getUserName() { + return this.userName; + } + + public String getUserPassword() { + return this.userPassword; + } + + public String getDatabaseName() { + return this.databaseName; + } + + public String getConnectionPoolSize() { + return this.connPoolSize; + } + + + public ArangoDBStoreParameters(String mappingFile, + String serverHost, + String serverPort, + String userName, + String userPassword, + String databaseName, + String connPoolSize) { + this.mappingFile = mappingFile; + this.serverHost = serverHost; + this.serverPort = serverPort; + this.userName = userName; + this.userPassword = userPassword; + this.databaseName = databaseName; + this.connPoolSize = connPoolSize; + } + + public static ArangoDBStoreParameters load(Properties properties, Configuration conf) { + String propMappingFile = properties.getProperty(ARANGO_DB_MAPPING_FILE, + ArangoDBStore.DEFAULT_MAPPING_FILE); + String propServerHost = properties.getProperty(ARANGO_DB_SERVER_HOST); + String propServerPort = properties.getProperty(ARANGO_DB_SERVER_PORT); + String propUserName = properties.getProperty(ARANGO_DB_USER_USERNAME); + String propUserPassword = properties.getProperty(ARANGO_DB_USER_PASSWORD); + String propDatabaseName = properties.getProperty(ARANGO_DB_DB_NAME); + String propConnPoolSize = properties.getProperty(ARANGO_DB_CONNECTION_POOL_SIZE); + String overrideHadoop = properties.getProperty(ARANGO_PROP_OVERRIDING); + if (!Boolean.parseBoolean(overrideHadoop)) { + propServerHost = conf.get(ARANGO_DB_SERVER_HOST, DEFAULT_ARANGO_DB_SERVER_HOST); + propServerPort = conf.get(ARANGO_DB_SERVER_PORT, DEFAULT_ARANGO_DB_SERVER_PORT); + propUserName = conf.get(ARANGO_DB_USER_USERNAME, DEFAULT_ARANGO_DB_USER_USERNAME); + propUserPassword = conf.get(ARANGO_DB_USER_PASSWORD, DEFAULT_ARANGO_DB_USER_PASSWORD); + } + + return new ArangoDBStoreParameters(propMappingFile, + propServerHost, propServerPort, propUserName, + propUserPassword, propDatabaseName, propConnPoolSize); + } + +} diff --git a/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/package-info.java b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/package-info.java new file mode 100644 index 00000000..5609e091 --- /dev/null +++ b/gora-arangodb/src/main/java/org/apache/gora/arangodb/store/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; \ No newline at end of file diff --git a/gora-arangodb/src/test/java/org/apache/gora/arangodb/ArangoDBTestDriver.java b/gora-arangodb/src/test/java/org/apache/gora/arangodb/ArangoDBTestDriver.java new file mode 100644 index 00000000..0e756c0b --- /dev/null +++ b/gora-arangodb/src/test/java/org/apache/gora/arangodb/ArangoDBTestDriver.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb; + +import org.apache.gora.GoraTestDriver; +import org.apache.gora.arangodb.store.ArangoDBStore; +import org.apache.gora.arangodb.store.ArangoDBStoreParameters; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.util.GoraException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; + +/** + * Driver to set up an embedded ArangoDB database instance for Gora + * dataStore specific integration tests. + */ +public class ArangoDBTestDriver extends GoraTestDriver { + + private static Logger log = LoggerFactory.getLogger(ArangoDBTestDriver.class); + + private GenericContainer arangodbContainer; + + public ArangoDBTestDriver(GenericContainer arangodbContainer) { + super(ArangoDBStore.class); + this.arangodbContainer = arangodbContainer; + } + + + public ArangoDBTestDriver() { + super(ArangoDBStore.class); + } + + /** + * Initialize embedded ArangoDB server instance as per the gora-arangodb-mapping.xml + * server configuration file. + */ + @Override + public void setUpClass() throws Exception { + log.info("Setting up ArangoDB test driver"); + conf.set(ArangoDBStoreParameters.ARANGO_DB_SERVER_HOST, "localhost"); + conf.set(ArangoDBStoreParameters.ARANGO_DB_SERVER_PORT, + arangodbContainer.getMappedPort(8529).toString()); + log.info("ArangoDB Embedded Server started successfully."); + } + + /** + * Terminate embedded ArangoDB server. + */ + @Override + public void tearDownClass() throws Exception { + log.info("ArangoDB Embedded server instance terminated successfully."); + } + + @Override + public <K, T extends Persistent> DataStore<K, T> createDataStore(Class<K> keyClass, + Class<T> persistentClass) throws GoraException { + + final DataStore<K, T> dataStore = DataStoreFactory + .createDataStore((Class<? extends DataStore<K, T>>) dataStoreClass, keyClass, + persistentClass, conf); + dataStores.add(dataStore); + log.info("Datastore for {} was added.", persistentClass); + return dataStore; + } + +} diff --git a/gora-arangodb/src/test/java/org/apache/gora/arangodb/package-info.java b/gora-arangodb/src/test/java/org/apache/gora/arangodb/package-info.java new file mode 100644 index 00000000..a4c10776 --- /dev/null +++ b/gora-arangodb/src/test/java/org/apache/gora/arangodb/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb; \ No newline at end of file diff --git a/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBGoraDataStoreTest.java b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBGoraDataStoreTest.java new file mode 100644 index 00000000..004f6c8c --- /dev/null +++ b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBGoraDataStoreTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import org.apache.gora.arangodb.ArangoDBTestDriver; +import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.store.DataStoreTestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; + +import java.time.Duration; + +/** + * Executes all the dataStore specific integration tests for ArangoDB dataStore. + */ +public class ArangoDBGoraDataStoreTest extends DataStoreTestBase { + + private static final Logger log = LoggerFactory.getLogger(ArangoDBGoraDataStoreTest.class); + + private static final String DOCKER_CONTAINER_NAME = "arangodb/arangodb:3.6.4"; + + @ClassRule + public static GenericContainer arangodbContainer = new GenericContainer(DOCKER_CONTAINER_NAME) + .withExposedPorts(8529) + .withEnv("ARANGO_ROOT_PASSWORD", "root") + .waitingFor(new ArangoDBStartupWaitStrategy()) + .withStartupTimeout(Duration.ofSeconds(240)); + + @BeforeClass + public static void setUpClass() throws Exception { + setTestDriver(new ArangoDBTestDriver(arangodbContainer)); + DataStoreTestBase.setUpClass(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataStoreTestBase.tearDownClass(); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Override + @Test + public void testGet() throws Exception { + log.info("test method: testGet"); + DataStoreTestUtil.testGetEmployee(this.employeeStore); + } + + @Ignore("3 types union field is not supported by OrientDBStore.") + @Override + public void testGet3UnionField() { + //3 types union field is not supported by OrientDBStore. + } + + @Ignore("delete by query fields is not supported as remove attribute not supported by ArangoDB.") + @Override + public void testDeleteByQueryFields() { + } + + @Ignore("updates of map fields are not supported as ArangoDB overrides but keep existing entries.") + @Override + public void testUpdate() { + } + +} diff --git a/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBStartupWaitStrategy.java b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBStartupWaitStrategy.java new file mode 100644 index 00000000..39298027 --- /dev/null +++ b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/ArangoDBStartupWaitStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; + +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; + +public class ArangoDBStartupWaitStrategy extends LogMessageWaitStrategy { + + private static final String regEx = ".*ArangoDB \\(version 3.6.4 \\[linux\\]\\) is ready for business. Have fun!\n"; + private int times = 1; + + public ArangoDBStartupWaitStrategy() { + withRegEx(regEx); + withTimes(times); + } + +} \ No newline at end of file diff --git a/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/package-info.java b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/package-info.java new file mode 100644 index 00000000..5609e091 --- /dev/null +++ b/gora-arangodb/src/test/java/org/apache/gora/arangodb/store/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.arangodb.store; \ No newline at end of file diff --git a/gora-arangodb/src/test/resources/gora-arangodb-mapping.xml b/gora-arangodb/src/test/resources/gora-arangodb-mapping.xml new file mode 100644 index 00000000..c3ebb4a8 --- /dev/null +++ b/gora-arangodb/src/test/resources/gora-arangodb-mapping.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" document="frontier"> + <field name="name" docfield="name" type="string"/> + <field name="dateOfBirth" docfield="dateOfBirth" type="long"/> + <field name="ssn" docfield="ssn" type="string"/> + <field name="value" docfield="value" type="string"/> + <field name="salary" docfield="salary" type="integer"/> + <field name="boss" docfield="boss" type="document"/> + <field name="webpage" docfield="webpage" type="document"/> + </class> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" document="webpage"> + <field name="url" docfield="url" type="string"/> + <field name="content" docfield="content" type="string"/> + <field name="parsedContent" docfield="pContent" type="list"/> + <field name="outlinks" docfield="linksOut" type="map"/> + <field name="headers" docfield="headers" type="document"/> + <field name="metadata" docfield="metadata" type="document"/> + <field name="byteData" docfield="byteData" type="document"/> + <field name="stringData" docfield="stringData" type="document"/> + </class> + + <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" document="TokenDatum"> + <field name="count" docfield="count" type="integer"/> + </class> + +</gora-otd> diff --git a/gora-arangodb/src/test/resources/gora.properties b/gora-arangodb/src/test/resources/gora.properties new file mode 100644 index 00000000..c0f2f4b5 --- /dev/null +++ b/gora-arangodb/src/test/resources/gora.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +gora.datastore.default=org.apache.gora.arangodb.store.ArangoDBStore +gora.arangodb.server.host=localhost +gora.arangodb.server.port=8529 +gora.arangodb.user.username=root +gora.arangodb.user.password=root +gora.arangodb.database.name=gora +gora.arangodb.con.pool.size=10 +gora.arangodb.override.hadoop.configuration=false diff --git a/pom.xml b/pom.xml index 3ec407bf..4eb2491f 100755 --- a/pom.xml +++ b/pom.xml @@ -777,6 +777,7 @@ <module>gora-hive</module> <module>gora-redis</module> <module>gora-jet</module> + <module>gora-arangodb</module> <module>gora-rethinkdb</module> <module>gora-elasticsearch</module> <module>gora-tutorial</module> @@ -845,6 +846,9 @@ <orientdb.version>3.0.30</orientdb.version> <orientqb.version>0.2.0</orientqb.version> + <!-- ArangoDB Dependencies --> + <arangodb.version>6.6.3</arangodb.version> + <!-- RethinkDB Dependencies --> <rethinkdb.version>2.4.4</rethinkdb.version> @@ -1737,6 +1741,13 @@ <version>${jsr107.api.version}</version> </dependency> + <!-- ArangoDB Dependencies --> + <dependency> + <groupId>com.arangodb</groupId> + <artifactId>arangodb-java-driver</artifactId> + <version>${arangodb.version}</version> + </dependency> + <!-- RethinkDB Dependencies --> <dependency> <groupId>com.rethinkdb</groupId>