This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new 2d0d910 GORA-664 Add datastore for Elasticsearch (#234) 2d0d910 is described below commit 2d0d91093dd7bceb07a4d64fb6b075bdc2117abd Author: Maria Podorvanova <36038640+podorvan...@users.noreply.github.com> AuthorDate: Thu Aug 12 04:22:22 2021 +1000 GORA-664 Add datastore for Elasticsearch (#234) * Create basic gora-elasticsearch module * Bump Elasticsearch version and remove redundant dependency * Implement connection and basic schema management - Create ElasticsearchStore class with connection initialization - Create basic Elasticsearch types mapping - Implement the necessary files for mapping representation (ElasticsearchMapping, ElasticsearchMappingBuilder) - Read schema from mapping file - Cover initialization with test * Set up Elasticsearch client parameters - Created gora.properties file with configuration properties - Loaded connection parameters from configuration - Implemented connection to Elasticsearch cluster with ElasticsearchParameters - Covered ElasticsearchParameters with tests - Added javadoc descriptions * Add a property for choosing the authentication method * Implement testing with Elasticsearch container - Added testing dependencies - Added GoraElasticsearchTestDriver with Elasticsearch container - Added javadoc descriptions to GoraElasticsearchTestDriver class - Fixed two existing tests in accordance to Elasticsearch container * Implement some methods for schema management Implemented schemaExists, createSchema, deleteSchema and flush methods * Add XSD validation file for the XML mapping * Fix XSD validation - Relocated gora-elasticsearch.xsd file to main resources - Covered XSD validation with test - Added gora-elasticsearch-mapping-invalid.xml file for test * Set up Elasticsearch container's authentication parameters * Implement exists method * Add comments for the connection parameters * Fix authentication - Set up password to Elasticsearch container properly - Set default Elasticsearch container server’s username in gora.properties - Added exceptions for missing arguments in authentication * Add parameter for the XSD validation - Defined a parameter for the XSD validation - Added a test case for the parameter - Made ElasticsearchStore read mapping file from properties, not configuration * Implement some basic Input-Output operations for schema management - Implemented delete, get and put methods - Implemented newInstance and getUnionSchema utility methods - Implemented basic serialization/deserialization for primitive AVRO types * Fix createSchema method - Added mappings while creating an Elasticsearch index - Added getter and setter to Datatype enum * Implement serialization/deserialization for some Avro data types - Implemented serializeFieldValue and deserializeFieldValue methods for ARRAY, BOOLEAN, BYTES and FIXED Avro data types - Fixed deserialization for STRING Avro data type - Added javadoc descriptions * Fix NPE when getting a non-existent Elasticsearch document * Implement serialization/deserialization for MAP Avro data type * Refactor serialization/deserialization to have better javadocs and arguments * Implement serialization/deserialization for RECORD Avro data type * Implement serialization/deserialization for UNION Avro data type * Fix passed Schema argument for ARRAY deserialization * Fix BYTES deserialization for Base64 encoded String * Ignore testGet3UnionField test * Add javadoc descriptions to serialization and deserialization methods * Implement newQuery method * Implement deleteByQuery method * Use an Enum instead of literal strings for the Authentication Type parameter * Use parameterized logging instead of string concatenation * Implement execute method * Implement getPartitions method * Add scaling_factor support * Remove unsupported Elasticsearch data types * Implement Metadata Analyzer for Elasticsearch Store * Try to fix range query by “_id” field * Fix execute method by adding a special "gora_id" field * Implement deleting specific fields of the records in deleteByQuery method * Implement MapReduce test * Fix flush method by using refresh * Address reviewer's comments * Add Elasticsearch specific logging dependency --- gora-elasticsearch/pom.xml | 177 +++++ .../mapping/ElasticsearchMapping.java | 73 ++ .../mapping/ElasticsearchMappingBuilder.java | 210 +++++ .../apache/gora/elasticsearch/mapping/Field.java | 200 +++++ .../gora/elasticsearch/mapping/package-info.java | 20 + .../apache/gora/elasticsearch/package-info.java | 20 + .../elasticsearch/query/ElasticsearchQuery.java | 41 + .../elasticsearch/query/ElasticsearchResult.java | 72 ++ .../gora/elasticsearch/query/package-info.java | 20 + .../elasticsearch/store/ElasticsearchStore.java | 864 +++++++++++++++++++++ .../ElasticsearchStoreCollectionMetadata.java | 53 ++ .../store/ElasticsearchStoreMetadataAnalyzer.java | 102 +++ .../gora/elasticsearch/store/package-info.java | 20 + .../elasticsearch/utils/AuthenticationType.java | 35 + .../utils/ElasticsearchConstants.java | 49 ++ .../utils/ElasticsearchParameters.java | 274 +++++++ .../gora/elasticsearch/utils/package-info.java | 20 + .../src/main/resources/gora-elasticsearch.xsd | 61 ++ .../elasticsearch/GoraElasticsearchTestDriver.java | 71 ++ .../mapreduce/ElasticsearchStoreMapReduceTest.java | 59 ++ .../gora/elasticsearch/mapreduce/package-info.java | 20 + .../apache/gora/elasticsearch/package-info.java | 20 + .../store/TestElasticsearchStore.java | 173 +++++ .../gora/elasticsearch/store/package-info.java | 20 + .../gora-elasticsearch-mapping-invalid.xml | 31 + .../test/resources/gora-elasticsearch-mapping.xml | 48 ++ .../src/test/resources/gora.properties | 40 + pom.xml | 10 + 28 files changed, 2803 insertions(+) diff --git a/gora-elasticsearch/pom.xml b/gora-elasticsearch/pom.xml new file mode 100644 index 0000000..28ba932 --- /dev/null +++ b/gora-elasticsearch/pom.xml @@ -0,0 +1,177 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>gora-elasticsearch</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: Elasticsearch</name> + <url>http://gora.apache.org</url> + <description>The Apache Gora open source framework provides an in-memory data model and + persistence for big data. Gora supports persisting to column stores, key value stores, + document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce + support.</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + <issueManagement> + <system>JIRA</system> + <url>https://issues.apache.org/jira/browse/GORA</url> + </issueManagement> + <ciManagement> + <system>Jenkins</system> + <url>https://builds.apache.org/job/Gora-trunk/</url> + </ciManagement> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.elasticsearch*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + <includes> + <include>**/*</include> + </includes> + <!--targetPath>${project.basedir}/target/classes/</targetPath --> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- Elasticsearch dependencies --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom</artifactId> + <scope>compile</scope> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + <version>2.8.2</version> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>elasticsearch</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + </dependency> + + </dependencies> + +</project> diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java new file mode 100644 index 0000000..d31e64f --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.mapping; + +import java.util.HashMap; +import java.util.Map; + +/** + * Mapping definitions for Elasticsearch. + */ +public class ElasticsearchMapping { + + private String indexName; + private Map<String, Field> fields; + + /** + * Empty constructor for the ElasticsearchMapping class. + */ + public ElasticsearchMapping() { + fields = new HashMap<>(); + } + + /** + * Returns the name of Elasticsearch index linked to the mapping. + * + * @return Index's name + */ + public String getIndexName() { + return indexName; + } + + /** + * Sets the index name of the Elasticsearch mapping. + * + * @param indexName Index's name + */ + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + /** + * Returns a map with all mapped fields. + * + * @return Map containing mapped fields + */ + public Map<String, Field> getFields() { + return fields; + } + + /** + * Add a new field to the mapped fields. + * + * @param classFieldName Field name in the persisted class + * @param field Mapped field from Elasticsearch index + */ + public void addField(String classFieldName, Field field) { + fields.put(classFieldName, field); + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java new file mode 100644 index 0000000..30dba7b --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.mapping; + +import com.google.inject.ConfigurationException; +import org.apache.commons.io.IOUtils; +import org.apache.gora.elasticsearch.store.ElasticsearchStore; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.util.GoraException; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.transform.Source; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Locale; + +/** + * Builder for Mapping definitions of Elasticsearch. + */ +public class ElasticsearchMappingBuilder<K, T extends PersistentBase> { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class); + + /** + * XSD validation file for the XML mapping. + */ + private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd"; + + // Index description + static final String ATT_NAME = "name"; + + static final String ATT_TYPE = "type"; + + // Class description + static final String TAG_CLASS = "class"; + + static final String ATT_KEYCLASS = "keyClass"; + + static final String ATT_INDEX = "index"; + + static final String TAG_FIELD = "field"; + + static final String ATT_DOCFIELD = "docfield"; + + static final String ATT_SCALINGFACTOR = "scalingFactor"; + + /** + * Mapping instance being built. + */ + private ElasticsearchMapping elasticsearchMapping; + + private final ElasticsearchStore<K, T> dataStore; + + /** + * Constructor for ElasticsearchMappingBuilder. + * + * @param store ElasticsearchStore instance + */ + public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) { + this.elasticsearchMapping = new ElasticsearchMapping(); + this.dataStore = store; + } + + /** + * Returns the Elasticsearch Mapping being built. + * + * @return Elasticsearch Mapping instance + */ + public ElasticsearchMapping getElasticsearchMapping() { + return elasticsearchMapping; + } + + /** + * Sets the Elasticsearch Mapping. + * + * @param elasticsearchMapping Elasticsearch Mapping instance + */ + public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) { + this.elasticsearchMapping = elasticsearchMapping; + } + + /** + * Reads Elasticsearch mappings from file. + * + * @param inputStream Mapping input stream + * @param xsdValidation Parameter for enabling XSD validation + */ + public void readMappingFile(InputStream inputStream, boolean xsdValidation) { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + if (inputStream == null) { + LOG.error("The mapping input stream is null!"); + throw new GoraException("The mapping input stream is null!"); + } + + // Convert input stream to a string to use it a few times + String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset()); + + // XSD validation for XML file + if (xsdValidation) { + Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); + Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI) + .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE))); + schema.newValidator().validate(xmlSource); + LOG.info("Mapping file is valid."); + } + + Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); + if (document == null) { + LOG.error("The mapping document is null!"); + throw new GoraException("The mapping document is null!"); + } + + Element root = document.getRootElement(); + // Extract class descriptions + @SuppressWarnings("unchecked") + List<Element> classElements = root.getChildren(TAG_CLASS); + for (Element classElement : classElements) { + final Class<T> persistentClass = dataStore.getPersistentClass(); + final Class<K> keyClass = dataStore.getKeyClass(); + if (haveKeyClass(keyClass, classElement) + && havePersistentClass(persistentClass, classElement)) { + loadPersistentClass(classElement, persistentClass); + break; + } + } + } catch (IOException | JDOMException | ConfigurationException | SAXException ex) { + throw new RuntimeException(ex); + } + LOG.info("Gora Elasticsearch mapping file was read successfully."); + } + + private boolean haveKeyClass(final Class<K> keyClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_KEYCLASS).equals( + keyClass.getName()); + } + + private boolean havePersistentClass(final Class<T> persistentClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_NAME).equals( + persistentClass.getName()); + } + + /** + * Handle the XML parsing of the class definition. + * + * @param classElement the XML node containing the class definition + */ + protected void loadPersistentClass(Element classElement, + Class<T> pPersistentClass) { + + String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX); + String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass); + + elasticsearchMapping.setIndexName(indexName); + // docNameFromMapping could be null here + if (!indexName.equals(indexNameFromMapping)) { + ElasticsearchStore.LOG + .info("Keyclass and nameclass match, but mismatching index names. " + + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.", + indexNameFromMapping, indexName); + if (indexNameFromMapping != null) { + elasticsearchMapping.setIndexName(indexName); + } + } + + // Process fields declaration + @SuppressWarnings("unchecked") + List<Element> fields = classElement.getChildren(TAG_FIELD); + for (Element fieldElement : fields) { + String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault()); + Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName)); + Field field; + if (fieldType.getType() == Field.DataType.SCALED_FLOAT) { + int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR)); + field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor)); + } else { + field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType); + } + elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field); + } + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java new file mode 100644 index 0000000..73016cb --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.mapping; + +import java.util.Objects; +import java.util.StringJoiner; + +/** + * Field definition for the Elasticsearch index. + */ +public class Field { + + private String name; + private FieldType dataType; + + /** + * Constructor for Field. + * + * @param name Field's name + * @param dataType Field's data type + */ + public Field(String name, FieldType dataType) { + this.name = name; + this.dataType = dataType; + } + + /** + * Returns the field's name. + * + * @return Field's name + */ + public String getName() { + return name; + } + + /** + * Sets the field's name. + * + * @param name Field's name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Returns the field's data-type. + * + * @return Field's data-type + */ + public FieldType getDataType() { + return dataType; + } + + /** + * Sets the field's data-type. + * + * @param dataType Field's data-type + */ + public void setDataType(FieldType dataType) { + this.dataType = dataType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Field field = (Field) o; + return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType); + } + + @Override + public String toString() { + return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]") + .add("name='" + name + "'") + .add("dataType=" + dataType) + .toString(); + } + + /** + * Elasticsearch supported data-type enumeration. For a more detailed list of data + * types supported by Elasticsearch refer to + * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html + */ + public enum DataType { + BINARY, + BOOLEAN, + KEYWORD, + CONSTANT_KEYWORD, + WILDCARD, + LONG, + INTEGER, + SHORT, + BYTE, + DOUBLE, + FLOAT, + HALF_FLOAT, + SCALED_FLOAT, + OBJECT, + FLATTENED, + NESTED, + TEXT, + COMPLETION, + SEARCH_AS_YOU_TYPE, + TOKEN_COUNT + } + + public static class FieldType { + + private DataType type; + + // Parameter for scaled_float type. + private int scalingFactor; + + /** + * Constructor for FieldType. + * + * @param type Elasticsearch data type + */ + public FieldType(DataType type) { + this.type = type; + } + + /** + * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type + * with scaling factor parameter. + * + * @param scalingFactor scaled_float field's scaling factor + */ + public FieldType(int scalingFactor) { + this.type = DataType.SCALED_FLOAT; + this.scalingFactor = scalingFactor; + } + + /** + * + * @return Elasticsearch data type + */ + public DataType getType() { + return type; + } + + /** + * + * @param type Elasticsearch data type + */ + public void setType(DataType type) { + this.type = type; + } + + /** + * Returns the scaling factor of scaled_float type. + * + * @return scaled_float field's scaling factor + */ + public int getScalingFactor() { + return scalingFactor; + } + + /** + * Sets the scaling factor of scaled_float type. + * + * @param scalingFactor scaled_float field's scaling factor + */ + public void setScalingFactor(int scalingFactor) { + this.scalingFactor = scalingFactor; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldType fieldType = (FieldType) o; + return scalingFactor == fieldType.scalingFactor && type == fieldType.type; + } + + @Override + public int hashCode() { + return Objects.hash(type, scalingFactor); + } + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java new file mode 100644 index 0000000..c5dccd1 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Mapping related classes. + */ +package org.apache.gora.elasticsearch.mapping; diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java new file mode 100644 index 0000000..8d2313a --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains all Elasticsearch datastore related classes. + */ +package org.apache.gora.elasticsearch; diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java new file mode 100644 index 0000000..f491006 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * Elasticsearch specific implementation of the {@link Query} interface. + */ +public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + + /** + * Constructor for the query. + * + * @param dataStore data store used + */ + public ElasticsearchQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + + public ElasticsearchQuery() { + super(null); + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java new file mode 100644 index 0000000..e7f8180 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +import java.util.List; + +/** + * ElasticsearchResult specific implementation of the + * {@link org.apache.gora.query.Result} interface. + */ +public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> { + + /** + * List of resulting persistent objects. + */ + private List<T> persistentObjects; + + /** + * List of resulting objects keys. + */ + private List<K> persistentKeys; + + public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) { + super(dataStore, query); + this.persistentKeys = persistentKeys; + this.persistentObjects = persistentObjects; + } + + @Override + public float getProgress() { + if (persistentObjects.size() == 0) { + return 1; + } + + return offset / (float) persistentObjects.size(); + } + + @Override + public int size() { + return persistentObjects.size(); + } + + @Override + protected boolean nextInner() { + if ((int) offset == persistentObjects.size()) { + return false; + } + + persistent = persistentObjects.get((int) offset); + key = persistentKeys.get((int) offset); + return persistent != null; + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java new file mode 100644 index 0000000..ad7b9d7 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Query related classes. + */ +package org.apache.gora.elasticsearch.query; diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java new file mode 100644 index 0000000..a82de7f --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java @@ -0,0 +1,864 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.store; + +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; +import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping; +import org.apache.gora.elasticsearch.mapping.ElasticsearchMappingBuilder; +import org.apache.gora.elasticsearch.mapping.Field; +import org.apache.gora.elasticsearch.query.ElasticsearchQuery; +import org.apache.gora.elasticsearch.query.ElasticsearchResult; +import org.apache.gora.elasticsearch.utils.ElasticsearchParameters; +import org.apache.gora.filter.Filter; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.ClassLoadingUtils; +import org.apache.gora.util.GoraException; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.*; + +/** + * Implementation of a Apache Elasticsearch data store to be used by Apache Gora. + * + * @param <K> class to be used for the key + * @param <T> class to be persisted within the store + */ +public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class); + private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml"; + public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file"; + private static final String XML_MAPPING_DEFINITION = "gora.mapping"; + public static final String XSD_VALIDATION = "gora.xsd_validation"; + + /** + * Elasticsearch client + */ + private RestHighLevelClient client; + + /** + * Mapping definition for Elasticsearch + */ + private ElasticsearchMapping elasticsearchMapping; + + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + try { + LOG.debug("Initializing Elasticsearch store"); + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); + super.initialize(keyClass, persistentClass, properties); + ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this); + InputStream mappingStream; + if (properties.containsKey(XML_MAPPING_DEFINITION)) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION)); + } + mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null); + } else { + mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); + } + String xsdValidation = properties.getProperty(XSD_VALIDATION, "false"); + builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation)); + elasticsearchMapping = builder.getElasticsearchMapping(); + client = createClient(parameters); + LOG.info("Elasticsearch store was successfully initialized."); + } catch (Exception ex) { + LOG.error("Error while initializing Elasticsearch store", ex); + throw new GoraException(ex); + } + } + + public static RestHighLevelClient createClient(ElasticsearchParameters parameters) { + RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort())); + + // Choosing the authentication method. + switch (parameters.getAuthenticationType()) { + case BASIC: + if (parameters.getUsername() != null && parameters.getPassword() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword())); + clientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } else { + throw new IllegalArgumentException("Missing username or password for BASIC authentication."); + } + break; + case TOKEN: + if (parameters.getAuthorizationToken() != null) { + Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", + parameters.getAuthorizationToken())}; + clientBuilder.setDefaultHeaders(defaultHeaders); + } else { + throw new IllegalArgumentException("Missing authorization token for TOKEN authentication."); + } + break; + case APIKEY: + if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) { + String apiKeyAuth = Base64.getEncoder() + .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret()) + .getBytes(StandardCharsets.UTF_8)); + Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)}; + clientBuilder.setDefaultHeaders(defaultHeaders); + } else { + throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication."); + } + break; + } + + if (parameters.getConnectTimeout() != 0) { + clientBuilder.setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout())); + } + + if (parameters.getSocketTimeout() != 0) { + clientBuilder.setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout())); + } + + if (parameters.getIoThreadCount() != 0) { + clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom() + .setIoThreadCount(parameters.getIoThreadCount()).build())); + } + return new RestHighLevelClient(clientBuilder); + } + + public ElasticsearchMapping getMapping() { + return elasticsearchMapping; + } + + @Override + public String getSchemaName() { + return elasticsearchMapping.getIndexName(); + } + + @Override + public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public void createSchema() throws GoraException { + CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName()); + Map<String, Object> properties = new HashMap<>(); + for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) { + Map<String, Object> fieldType = new HashMap<>(); + fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT)); + if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) { + fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor()); + } + properties.put(entry.getKey(), fieldType); + } + // Special field for range query + properties.put("gora_id", new HashMap<String, Object>() {{ + put("type", "keyword"); + }}); + Map<String, Object> mapping = new HashMap<>(); + mapping.put("properties", properties); + request.mapping(mapping); + try { + if (!client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { + client.indices().create(request, RequestOptions.DEFAULT); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public void deleteSchema() throws GoraException { + DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName()); + try { + if (client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { + client.indices().delete(request, RequestOptions.DEFAULT); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public boolean schemaExists() throws GoraException { + try { + return client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public boolean exists(K key) throws GoraException { + GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); + getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_"); + try { + return client.exists(getRequest, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public T get(K key, String[] fields) throws GoraException { + String[] requestedFields = getFieldsToQuery(fields); + List<String> documentFields = new ArrayList<>(); + for (String requestedField : requestedFields) { + documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName()); + } + try { + // Prepare the Elasticsearch request + GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); + GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse.isExists()) { + Map<String, Object> sourceMap = getResponse.getSourceAsMap(); + + // Map of field's name and its value from the Document + Map<String, Object> fieldsAndValues = new HashMap<>(); + for (String field : documentFields) { + fieldsAndValues.put(field, sourceMap.get(field)); + } + + // Build the corresponding persistent + return newInstance(fieldsAndValues, requestedFields); + } else { + return null; + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public void put(K key, T obj) throws GoraException { + if (obj.isDirty()) { + Schema schemaObj = obj.getSchema(); + List<Schema.Field> fields = schemaObj.getFields(); + Map<String, Object> jsonMap = new HashMap<>(); + for (Schema.Field field : fields) { + Field mappedField = elasticsearchMapping.getFields().get(field.name()); + if (mappedField != null) { + Object fieldValue = obj.get(field.pos()); + if (fieldValue != null) { + Schema fieldSchema = field.schema(); + Object serializedObj = serializeFieldValue(fieldSchema, fieldValue); + jsonMap.put(mappedField.getName(), serializedObj); + } + } + } + // Special field for range query + jsonMap.put("gora_id", key); + // Prepare the Elasticsearch request + IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap); + try { + client.index(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + } else { + LOG.info("Ignored putting object {} in the store as it is neither " + + "new, neither dirty.", new Object[]{obj}); + } + } + + @Override + public boolean delete(K key) throws GoraException { + DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key); + try { + DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); + return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND; + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public long deleteByQuery(Query<K, T> query) throws GoraException { + try { + BulkByScrollResponse bulkResponse; + if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) { + UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName()); + QueryBuilder matchDocumentsWithinRange = QueryBuilders + .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); + updateRequest.setQuery(matchDocumentsWithinRange); + + // Create a script for deleting fields + StringBuilder toDelete = new StringBuilder(); + String[] fieldsToDelete = query.getFields(); + for (String field : fieldsToDelete) { + String elasticsearchField = elasticsearchMapping.getFields().get(field).getName(); + toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField)); + } + //toDelete.deleteCharAt(toDelete.length() - 1); + updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap())); + bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT); + return bulkResponse.getUpdated(); + } else { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName()); + QueryBuilder matchDocumentsWithinRange = QueryBuilders + .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); + deleteRequest.setQuery(matchDocumentsWithinRange); + bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); + return bulkResponse.getDeleted(); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public Result<K, T> execute(Query<K, T> query) throws GoraException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + // Set the query result limit + int size = (int) query.getLimit(); + if (size != -1) { + searchSourceBuilder.size(size); + } + try { + // Build the actual Elasticsearch range query + QueryBuilder rangeQueryBuilder = QueryBuilders + .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey()); + searchSourceBuilder.query(rangeQueryBuilder); + SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName()); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + + String[] avroFields = getFieldsToQuery(query.getFields()); + + SearchHits hits = searchResponse.getHits(); + SearchHit[] searchHits = hits.getHits(); + List<K> hitId = new ArrayList<>(); + + // Check filter + Filter<K, T> queryFilter = query.getFilter(); + List<T> filteredObjects = new ArrayList<>(); + for (SearchHit hit : searchHits) { + Map<String, Object> sourceAsMap = hit.getSourceAsMap(); + if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) { + filteredObjects.add(newInstance(sourceAsMap, avroFields)); + hitId.add((K) hit.getId()); + } + } + return new ElasticsearchResult<>(this, query, hitId, filteredObjects); + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public Query<K, T> newQuery() { + ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(getConf()); + partitions.add(partitionQuery); + return partitions; + } + + @Override + public void flush() throws GoraException { + try { + client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + } + + @Override + public void close() { + try { + client.close(); + LOG.info("Elasticsearch datastore destroyed successfully."); + } catch (IOException ex) { + LOG.error(ex.getMessage(), ex); + } + } + + /** + * Build a new instance of the persisted class from the Document retrieved from the database. + * + * @param fieldsAndValues Map of field's name and its value from the Document + * that results from the query to the database + * @param requestedFields the list of fields to be mapped to the persistence class instance + * @return a persistence class instance which content was deserialized from the Document + * @throws IOException + */ + public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException { + // Create new empty persistent bean instance + T persistent = newPersistent(); + + requestedFields = getFieldsToQuery(requestedFields); + // Populate each field + for (String objField : requestedFields) { + Schema.Field field = fieldMap.get(objField); + Schema fieldSchema = field.schema(); + String docFieldName = elasticsearchMapping.getFields().get(objField).getName(); + Object fieldValue = fieldsAndValues.get(docFieldName); + + Object result = deserializeFieldValue(field, fieldSchema, fieldValue); + persistent.put(field.pos(), result); + } + persistent.clearDirty(); + return persistent; + } + + /** + * Deserialize an Elasticsearch object to a persistent Avro object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchValue Elasticsearch field value to be deserialized + * @return deserialized Avro object from the Elasticsearch object + * @throws GoraException when the given Elasticsearch value cannot be deserialized + */ + private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema, + Object elasticsearchValue) throws GoraException { + Object fieldValue; + switch (avroFieldSchema.getType()) { + case MAP: + fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue); + break; + case RECORD: + fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue); + break; + case ARRAY: + fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue); + break; + case BOOLEAN: + fieldValue = Boolean.parseBoolean(elasticsearchValue.toString()); + break; + case BYTES: + fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString())); + break; + case FIXED: + case NULL: + fieldValue = null; + break; + case UNION: + fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue); + break; + case DOUBLE: + fieldValue = Double.parseDouble(elasticsearchValue.toString()); + break; + case ENUM: + fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString()); + break; + case FLOAT: + fieldValue = Float.parseFloat(elasticsearchValue.toString()); + break; + case INT: + fieldValue = Integer.parseInt(elasticsearchValue.toString()); + break; + case LONG: + fieldValue = Long.parseLong(elasticsearchValue.toString()); + break; + case STRING: + fieldValue = new Utf8(elasticsearchValue.toString()); + break; + default: + fieldValue = elasticsearchValue; + } + return fieldValue; + } + + /** + * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchValue Elasticsearch field value to be deserialized + * @return deserialized Avro List from the given Elasticsearch value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema, + Object elasticsearchValue) throws GoraException { + List<Object> list = new ArrayList<>(); + if (elasticsearchValue != null) { + for (Object item : (List<Object>) elasticsearchValue) { + Object result = deserializeFieldValue(avroField, avroFieldSchema, item); + list.add(result); + } + } + return new DirtyListWrapper<>(list); + } + + /** + * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchMap Elasticsearch Map value to be deserialized + * @return deserialized Avro Map from the given Elasticsearch Map value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema, + Map<String, Object> elasticsearchMap) throws GoraException { + Map<Utf8, Object> deserializedMap = new HashMap<>(); + if (elasticsearchMap != null) { + for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) { + String mapKey = entry.getKey(); + Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue()); + deserializedMap.put(new Utf8(mapKey), mapValue); + } + } + return new DirtyMapWrapper<>(deserializedMap); + } + + /** + * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchRecord Elasticsearch Record value to be deserialized + * @return deserialized Avro Object from the given Elasticsearch Record value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchRecord(Schema avroFieldSchema, + Map<String, Object> elasticsearchRecord) throws GoraException { + Class<?> clazz; + try { + clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName()); + } catch (ClassNotFoundException ex) { + throw new GoraException(ex); + } + PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); + for (Schema.Field recField : avroFieldSchema.getFields()) { + Schema innerSchema = recField.schema(); + Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null)); + record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName()))); + } + return record; + } + + /** + * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchUnion Elasticsearch Union value to be deserialized + * @return deserialized Avro Object from the given Elasticsearch Union value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException { + Object deserializedUnion; + Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); + if (avroFieldSchema.getTypes().size() == 2 && + (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && + !type0.equals(type1)) { + int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion); + } else if (avroFieldSchema.getTypes().size() == 3) { + Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); + if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && + (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { + if (elasticsearchUnion == null) { + deserializedUnion = null; + } else if (elasticsearchUnion instanceof String) { + throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null."); + } else { + int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + return deserializedUnion; + } + + /** + * Serialize a persistent Avro object as used in Gora generated classes to + * an object that can be written into Elasticsearch. + * + * @param avroFieldSchema schema for the persistent Avro class field + * @param avroFieldValue persistent Avro field value to be serialized + * @return serialized field value + * @throws GoraException when the given Avro object cannot be serialized + */ + private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException { + Object output = avroFieldValue; + switch (avroFieldSchema.getType()) { + case ARRAY: + output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType()); + break; + case MAP: + output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType()); + break; + case RECORD: + output = recordToElasticsearch(avroFieldValue, avroFieldSchema); + break; + case BYTES: + output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array()); + break; + case UNION: + output = unionToElasticsearch(avroFieldValue, avroFieldSchema); + break; + case BOOLEAN: + case DOUBLE: + case ENUM: + case FLOAT: + case INT: + case LONG: + case STRING: + output = avroFieldValue.toString(); + break; + case FIXED: + break; + case NULL: + output = null; + break; + } + return output; + } + + /** + * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a + * List that can safely be written into Elasticsearch. + * + * @param collection the collection to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a List version of the collection that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException { + List<Object> list = new ArrayList<>(); + for (Object item : collection) { + Object result = serializeFieldValue(avroFieldSchema, item); + list.add(result); + } + return list; + } + + /** + * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a + * map that can safely be written into Elasticsearch. + * + * @param map the map to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a Map version of the Java map that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException { + Map<CharSequence, Object> serializedMap = new HashMap<>(); + for (Map.Entry<CharSequence, ?> entry : map.entrySet()) { + String mapKey = entry.getKey().toString(); + Object mapValue = entry.getValue(); + Object result = serializeFieldValue(avroFieldSchema, mapValue); + serializedMap.put(mapKey, result); + } + return serializedMap; + } + + /** + * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a + * record that can safely be written into Elasticsearch. + * + * @param record the object to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a record version of the Java object that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException { + Map<CharSequence, Object> serializedRecord = new HashMap<>(); + for (Schema.Field member : avroFieldSchema.getFields()) { + Object innerValue = ((PersistentBase) record).get(member.pos()); + serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue)); + } + return serializedRecord; + } + + /** + * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a + * object that can safely be written into Elasticsearch. + * + * @param union the object to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a object version of the Java object that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException { + Object serializedUnion; + Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); + if (avroFieldSchema.getTypes().size() == 2 && + (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && + !type0.equals(type1)) { + int schemaPos = getUnionSchema(union, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + serializedUnion = serializeFieldValue(unionSchema, union); + } else if (avroFieldSchema.getTypes().size() == 3) { + Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); + if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && + (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { + if (union == null) { + serializedUnion = null; + } else if (union instanceof String) { + throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type."); + } else { + int schemaPos = getUnionSchema(union, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + serializedUnion = recordToElasticsearch(union, unionSchema); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + return serializedUnion; + } + + /** + * Method to retrieve the corresponding schema type index of a particular + * object having UNION schema. As UNION type can have one or more types and at + * a given instance, it holds an object of only one type of the defined types, + * this method is used to figure out the corresponding instance's schema type + * index. + * + * @param instanceValue value that the object holds + * @param unionSchema union schema containing all of the data types + * @return the unionSchemaPosition corresponding schema position + */ + private int getUnionSchema(Object instanceValue, Schema unionSchema) { + int unionSchemaPos = 0; + for (Schema currentSchema : unionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) { + return unionSchemaPos; + } + if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) { + return unionSchemaPos; + } + if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) { + return unionSchemaPos; + } + if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) { + return unionSchemaPos; + } + if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + unionSchemaPos++; + } + return 0; + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java new file mode 100644 index 0000000..01466ee --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.store; + +import java.util.ArrayList; +import java.util.List; + +/** + * Class with the collection info returned by ElasticsearchStoreMetadataAnalyzer with the information + * about an ElasticsearchStore collection. + */ +public class ElasticsearchStoreCollectionMetadata { + + /** + * Collection document keys present in a given collection at ElasticsearchStore. + */ + private List<String> documentKeys = new ArrayList<>(); + + /** + * Collection document types present in a given collection at ElasticsearchStore. + */ + private List<String> documentTypes = new ArrayList<>(); + + public List<String> getDocumentKeys() { + return documentKeys; + } + + public void setDocumentKeys(List<String> documentKeys) { + this.documentKeys = documentKeys; + } + + public List<String> getDocumentTypes() { + return documentTypes; + } + + public void setDocumentTypes(List<String> documentTypes) { + this.documentTypes = documentTypes; + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java new file mode 100644 index 0000000..6806aa3 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.store; + +import org.apache.gora.elasticsearch.utils.ElasticsearchParameters; +import org.apache.gora.store.impl.DataStoreMetadataAnalyzer; +import org.apache.gora.util.GoraException; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexResponse; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class); + + private RestHighLevelClient elasticsearchClient; + + @Override + public void initialize() throws GoraException { + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); + elasticsearchClient = ElasticsearchStore.createClient(parameters); + } + + @Override + public String getType() { + return "ELASTICSEARCH"; + } + + @Override + public List<String> getTablesNames() throws GoraException { + GetIndexRequest request = new GetIndexRequest("*"); + GetIndexResponse response; + try { + response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + if (response == null) { + LOG.error("Could not find indices."); + throw new GoraException("Could not find indices."); + } + return Arrays.asList(response.getIndices()); + } + + @Override + public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException { + GetIndexRequest request = new GetIndexRequest(tableName); + GetIndexResponse getIndexResponse; + try { + getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName); + Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties"); + + List<String> documentTypes = new ArrayList<>(); + List<String> documentKeys = new ArrayList<>(); + + for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) { + Map<String, Object> subEntry = (Map<String, Object>) entry.getValue(); + documentTypes.add((String) subEntry.get("type")); + documentKeys.add(entry.getKey()); + } + + ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata(); + collectionMetadata.setDocumentKeys(documentKeys); + collectionMetadata.setDocumentTypes(documentTypes); + return collectionMetadata; + } + + @Override + public void close() throws IOException { + if (elasticsearchClient != null) { + this.elasticsearchClient.close(); + } + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java new file mode 100644 index 0000000..66f575f --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains the core classes of the Elasticsearch datastore. + */ +package org.apache.gora.elasticsearch.store; diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java new file mode 100644 index 0000000..dbbd45b --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.utils; + +/** + * Authentication type to connect to the Elasticsearch server. + */ +public enum AuthenticationType { + /** + * Basic authentication requires to provide a username and password. + */ + BASIC, + /** + * Token authentication requires to provide an Elasticsearch access token. + */ + TOKEN, + /** + * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret. + */ + APIKEY +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java new file mode 100644 index 0000000..fa9bdc2 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.utils; + +/** + * Constants file for Elasticsearch. + */ +public class ElasticsearchConstants { + /** + * Property indicating if the hadoop configuration has priority or not. + */ + public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration"; + + /** + * Default configurations for Elasticsearch. + */ + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 9200; + + /** + * List of keys used in the configuration file of Elasticsearch. + */ + public static final String PROP_HOST = "gora.datastore.elasticsearch.host"; + public static final String PROP_PORT = "gora.datastore.elasticsearch.port"; + public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme"; + public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType"; + public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username"; + public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password"; + public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken"; + public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId"; + public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret"; + public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout"; + public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout"; + public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount"; +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java new file mode 100644 index 0000000..4b0cef4 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.utils; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +/** + * Parameters definitions for Elasticsearch. + */ +public class ElasticsearchParameters { + + /** + * Elasticsearch server host. + */ + private String host; + + /** + * Elasticsearch server port. + */ + private int port; + + /** + * Elasticsearch server scheme. + * Optional. If not provided, defaults to http. + */ + private String scheme; + + /** + * Authentication type to connect to the server. + * Can be BASIC, TOKEN or APIKEY. + */ + private AuthenticationType authenticationType; + + /** + * Username to use for server authentication. + * Required for BASIC authentication to connect to the server. + */ + private String username; + + /** + * Password to use for server authentication. + * Required for BASIC authentication to connect to the server. + */ + private String password; + + /** + * Authorization token to use for server authentication. + * Required for TOKEN authentication to connect to the server. + */ + private String authorizationToken; + + /** + * API Key ID to use for server authentication. + * Required for APIKEY authentication to connect to the server. + */ + private String apiKeyId; + + /** + * API Key Secret to use for server authentication. + * Required for APIKEY authentication to connect to the server. + */ + private String apiKeySecret; + + /** + * Timeout in milliseconds used for establishing the connection to the server. + * Optional. If not provided, defaults to 5000s. + * + */ + private int connectTimeout; + + /** + * Timeout in milliseconds used for waiting for data – after establishing the connection to the server. + * Optional. If not provided, defaults to 60000s. + */ + private int socketTimeout; + + /** + * Number of worker threads used by the connection manager. + * Optional. If not provided, defaults to 1. + */ + private int ioThreadCount; + + public ElasticsearchParameters(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getScheme() { + return scheme; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + public AuthenticationType getAuthenticationType() { + return authenticationType; + } + + public void setAuthenticationType(AuthenticationType authenticationType) { + this.authenticationType = authenticationType; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getAuthorizationToken() { + return authorizationToken; + } + + public void setAuthorizationToken(String authorizationToken) { + this.authorizationToken = authorizationToken; + } + + public String getApiKeyId() { + return apiKeyId; + } + + public void setApiKeyId(String apiKeyId) { + this.apiKeyId = apiKeyId; + } + + public String getApiKeySecret() { + return apiKeySecret; + } + + public void setApiKeySecret(String apiKeySecret) { + this.apiKeySecret = apiKeySecret; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getIoThreadCount() { + return ioThreadCount; + } + + public void setIoThreadCount(int ioThreadCount) { + this.ioThreadCount = ioThreadCount; + } + + /** + * Reads Elasticsearch parameters from a properties list. + * + * @param properties Properties list + * @return Elasticsearch parameters instance + */ + public static ElasticsearchParameters load(Properties properties, Configuration conf) { + ElasticsearchParameters elasticsearchParameters; + + if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) { + elasticsearchParameters = new ElasticsearchParameters( + conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), + conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT)); + } else { + elasticsearchParameters = new ElasticsearchParameters( + properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), + Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT, + String.valueOf(ElasticsearchConstants.DEFAULT_PORT)))); + } + + String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME); + if (schemeProperty != null) { + elasticsearchParameters.setScheme(schemeProperty); + } + + AuthenticationType authenticationTypeProperty = + AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE)); + if (authenticationTypeProperty != null) { + elasticsearchParameters.setAuthenticationType(authenticationTypeProperty); + } + + String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME); + if (usernameProperty != null) { + elasticsearchParameters.setUsername(usernameProperty); + } + + String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD); + if (passwordProperty != null) { + elasticsearchParameters.setPassword(passwordProperty); + } + + String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN); + if (authorizationTokenProperty != null) { + elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty); + } + + String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID); + if (apiKeyIdProperty != null) { + elasticsearchParameters.setApiKeyId(apiKeyIdProperty); + } + + String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET); + if (apiKeySecretProperty != null) { + elasticsearchParameters.setApiKeySecret(apiKeySecretProperty); + } + + String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT); + if (connectTimeoutProperty != null) { + elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty)); + } + + String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT); + if (socketTimeoutProperty != null) { + elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty)); + } + + String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT); + if (ioThreadCountProperty != null) { + elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty)); + } + + return elasticsearchParameters; + } +} diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java new file mode 100644 index 0000000..49fcb93 --- /dev/null +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains utility classes. + */ +package org.apache.gora.elasticsearch.utils; diff --git a/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd new file mode 100644 index 0000000..0c7c011 --- /dev/null +++ b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"> + <xs:element name="gora-otd" type="elasticsearch-mapping"/> + + <xs:complexType name="elasticsearch-mapping"> + <xs:sequence> + <xs:element name="class" type="class-mapping" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="class-mapping"> + <xs:sequence> + <xs:element name="field" type="field-mapping" maxOccurs="unbounded"/> + </xs:sequence> + <xs:attribute name="name" type="nameClass-types" use="required"/> + <xs:attribute name="keyClass" type="keyClass-types" use="required"/> + <xs:attribute name="index" type="xs:string" use="required"/> + </xs:complexType> + + <xs:complexType name="field-mapping"> + <xs:attribute name="name" type="fieldName-types" use="required"/> + <xs:attribute name="docfield" type="xs:string" use="required"/> + <xs:attribute name="type" type="xs:string" use="required"/> + <xs:attribute name="scalingFactor" type="xs:string" use="optional"/> + </xs:complexType> + + <xs:simpleType name="keyClass-types"> + <xs:restriction base="xs:string"> + <xs:enumeration value="java.lang.String"/> + </xs:restriction> + </xs:simpleType> + + <xs:simpleType name="nameClass-types"> + <xs:restriction base="xs:string"> + <xs:pattern value="([\p{L}_$][\p{L}\p{N}_$]*\.)*[\p{L}_$][\p{L}\p{N}_$]*"/> + </xs:restriction> + </xs:simpleType> + + <xs:simpleType name="fieldName-types"> + <xs:restriction base="xs:string"> + <xs:pattern value="[a-zA-Z][a-zA-Z0-9]*"/> + </xs:restriction> + </xs:simpleType> + +</xs:schema> diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java new file mode 100644 index 0000000..ef17dbc --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch; + +import org.apache.gora.GoraTestDriver; +import org.apache.gora.elasticsearch.store.ElasticsearchStore; +import org.apache.gora.elasticsearch.utils.ElasticsearchConstants; +import org.apache.gora.store.DataStoreFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.util.Properties; + +/** + * Helper class for third part tests using gora-elasticsearch backend. + * + * @see GoraTestDriver + */ +public class GoraElasticsearchTestDriver extends GoraTestDriver { + + private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1"; + private ElasticsearchContainer elasticsearchContainer; + + /** + * Constructor for this class. + */ + public GoraElasticsearchTestDriver() { + super(ElasticsearchStore.class); + Properties properties = DataStoreFactory.createProps(); + elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE) + .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD)) + .withEnv("xpack.security.enabled", "true"); + } + + /** + * Initiate the Elasticsearch server on the default port. + */ + @Override + public void setUpClass() throws Exception { + elasticsearchContainer.start(); + log.info("Setting up Elasticsearch test driver"); + + int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT); + String host = elasticsearchContainer.getContainerIpAddress(); + conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port)); + conf.set(ElasticsearchConstants.PROP_HOST, host); + } + + /** + * Tear the server down. + */ + @Override + public void tearDownClass() throws Exception { + elasticsearchContainer.close(); + log.info("Tearing down Elasticsearch test driver"); + } +} diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java new file mode 100644 index 0000000..d5aa9e6 --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.mapreduce; + +import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver; +import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Executes tests for MR jobs over Elasticsearch dataStore. + */ +public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase { + + private GoraElasticsearchTestDriver driver; + + public ElasticsearchStoreMapReduceTest() throws IOException { + super(); + driver = new GoraElasticsearchTestDriver(); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { + return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); + } +} diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java new file mode 100644 index 0000000..781ce61 --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains map reduce tests. + */ +package org.apache.gora.elasticsearch.mapreduce; diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java new file mode 100644 index 0000000..406f829 --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Elasticsearch datastore test utilities. + */ +package org.apache.gora.elasticsearch; diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java new file mode 100644 index 0000000..7c95593 --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.elasticsearch.store; + +import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver; +import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping; +import org.apache.gora.elasticsearch.mapping.Field; +import org.apache.gora.elasticsearch.utils.AuthenticationType; +import org.apache.gora.elasticsearch.utils.ElasticsearchParameters; +import org.apache.gora.examples.generated.EmployeeInt; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.store.DataStoreMetadataFactory; +import org.apache.gora.store.DataStoreTestBase; +import org.apache.gora.store.impl.DataStoreMetadataAnalyzer; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; + +/** + * Test case for ElasticsearchStore. + */ +public class TestElasticsearchStore extends DataStoreTestBase { + + static { + setTestDriver(new GoraElasticsearchTestDriver()); + } + + @Test + public void testInitialize() throws GoraException { + log.info("test method: testInitialize"); + + ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping(); + + Map<String, Field> fields = new HashMap<String, Field>() {{ + put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT))); + put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG))); + put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT))); + put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT))); + put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER))); + put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT))); + put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT))); + }}; + + Assert.assertEquals("frontier", employeeStore.getSchemaName()); + Assert.assertEquals("frontier", mapping.getIndexName()); + Assert.assertEquals(fields, mapping.getFields()); + } + + @Test + public void testLoadElasticsearchParameters() throws IOException { + log.info("test method: testLoadElasticsearchParameters"); + + Properties properties = DataStoreFactory.createProps(); + + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration()); + + Assert.assertEquals("localhost", parameters.getHost()); + Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType()); + Assert.assertEquals("elastic", parameters.getUsername()); + Assert.assertEquals("password", parameters.getPassword()); + } + + @Test(expected = GoraException.class) + public void testInvalidXmlFile() throws Exception { + log.info("test method: testInvalidXmlFile"); + + Properties properties = DataStoreFactory.createProps(); + properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); + properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true"); + testDriver.createDataStore(String.class, EmployeeInt.class, properties); + } + + @Test + public void testXsdValidationParameter() throws GoraException { + log.info("test method: testXsdValidationParameter"); + + Properties properties = DataStoreFactory.createProps(); + properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); + properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false"); + testDriver.createDataStore(String.class, EmployeeInt.class, properties); + } + + @Test + public void testGetType() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + String actualType = storeMetadataAnalyzer.getType(); + String expectedType = "ELASTICSEARCH"; + Assert.assertEquals(expectedType, actualType); + } + + @Test + public void testGetTablesNames() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames()); + List<String> expectedTablesNames = new ArrayList<String>() { + { + add("frontier"); + add("webpage"); + } + }; + Assert.assertEquals(expectedTablesNames, actualTablesNames); + } + + @Test + public void testGetTableInfo() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + ElasticsearchStoreCollectionMetadata actualCollectionMetadata = + (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier"); + + List<String> expectedDocumentKeys = new ArrayList<String>() { + { + add("name"); + add("dateOfBirth"); + add("ssn"); + add("value"); + add("salary"); + add("boss"); + add("webpage"); + add("gora_id"); + } + }; + + List<String> expectedDocumentTypes = new ArrayList<String>() { + { + add("text"); + add("long"); + add("text"); + add("text"); + add("integer"); + add("object"); + add("object"); + add("keyword"); + } + }; + + Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size()); + Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys())); + + Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size()); + Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes())); + } + + @Ignore("Elasticsearch doesn't support 3 types union field yet") + @Override + public void testGet3UnionField() { + } +} diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java new file mode 100644 index 0000000..cd77199 --- /dev/null +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains Elasticsearch datastore tests. + */ +package org.apache.gora.elasticsearch.store; diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml new file mode 100644 index 0000000..c1a8ccd --- /dev/null +++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd"> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier"> + <!-- + Remove type to test XSD validation. + <field name="name" docfield="name" type="text"/> + --> + <field name="name" docfield="name"/> + </class> + +</gora-otd> diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml new file mode 100644 index 0000000..13a8d08 --- /dev/null +++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd"> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier"> + <field name="name" docfield="name" type="text"/> + <field name="dateOfBirth" docfield="dateOfBirth" type="long"/> + <field name="ssn" docfield="ssn" type="text"/> + <field name="value" docfield="value" type="text"/> + <field name="salary" docfield="salary" type="integer"/> + <field name="boss" docfield="boss" type="object"/> + <field name="webpage" docfield="webpage" type="object"/> + </class> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" index="webpage"> + <field name="url" docfield="url" type="text"/> + <field name="content" docfield="content" type="binary"/> + <field name="parsedContent" docfield="pContent" type="text"/> + <field name="outlinks" docfield="links.out" type="object"/> + <field name="headers" docfield="headers" type="object"/> + <field name="metadata" docfield="metadata" type="object"/> + <field name="byteData" docfield="byteData" type="object"/> + <field name="stringData" docfield="stringData" type="object"/> + </class> + + <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" index="TokenDatum"> + <field name="count" docfield="count" type="integer"/> + </class> + +</gora-otd> diff --git a/gora-elasticsearch/src/test/resources/gora.properties b/gora-elasticsearch/src/test/resources/gora.properties new file mode 100644 index 0000000..99842a9 --- /dev/null +++ b/gora-elasticsearch/src/test/resources/gora.properties @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +gora.datastore.default=org.apache.gora.elasticsearch.store.ElasticsearchStore +gora.elasticsearch.override.hadoop.configuration=false +gora.datastore.elasticsearch.host=localhost +gora.datastore.elasticsearch.port=9200 +#gora.datastore.elasticsearch.scheme= + +#Basic authentication requires username and password. +gora.datastore.elasticsearch.authenticationType=BASIC +#Default username to connect to the Elasticsearch Docker container. +gora.datastore.elasticsearch.username=elastic +gora.datastore.elasticsearch.password=password + +#Authentication with an Elasticsearch access token. +#gora.datastore.elasticsearch.authenticationType=TOKEN +#gora.datastore.elasticsearch.authorizationToken= + +#Authentication with an Elasticsearch API Key requires API Key ID and API Key Secret. +#gora.datastore.elasticsearch.authenticationType=APIKEY +#gora.datastore.elasticsearch.apiKeyId= +#gora.datastore.elasticsearch.apiKeySecret= + +#Additional configuration parameters for timeouts and threads. +#gora.datastore.elasticsearch.connectTimeout= +#gora.datastore.elasticsearch.socketTimeout= +#gora.datastore.elasticsearch.ioThreadCount= diff --git a/pom.xml b/pom.xml index eeb8182..b3b27f9 100755 --- a/pom.xml +++ b/pom.xml @@ -816,6 +816,7 @@ <module>gora-redis</module> <module>gora-jet</module> <module>gora-rethinkdb</module> + <module>gora-elasticsearch</module> <module>gora-tutorial</module> <module>gora-benchmark</module> <module>sources-dist</module> @@ -855,6 +856,8 @@ <restlet.version>2.4.0</restlet.version> <!-- Flink Dependencies --> <flink.version>1.6.2</flink.version> + <!-- Elasticsearch Dependencies --> + <elasticsearch.version>7.10.1</elasticsearch.version> <spark.version>2.2.1</spark.version> <aerospike.version>5.0.6</aerospike.version> @@ -1201,6 +1204,13 @@ <version>${redisson.version}</version> </dependency> + <!-- Elasticsearch dependencies --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <!--Hadoop dependencies --> <dependency> <groupId>org.apache.hadoop</groupId>