This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new ffc3a13 GORA-664 Fix testcases / formatting (#247) ffc3a13 is described below commit ffc3a13b778dce17db823dbcda8f5ab44640ab85 Author: Kevin Ratnasekera <djkevi...@yahoo.com> AuthorDate: Thu Aug 12 00:25:02 2021 +0530 GORA-664 Fix testcases / formatting (#247) --- .../mapping/ElasticsearchMapping.java | 82 +- .../mapping/ElasticsearchMappingBuilder.java | 314 ++-- .../apache/gora/elasticsearch/mapping/Field.java | 280 ++-- .../elasticsearch/query/ElasticsearchQuery.java | 22 +- .../elasticsearch/query/ElasticsearchResult.java | 64 +- .../elasticsearch/store/ElasticsearchStore.java | 1494 ++++++++++---------- .../ElasticsearchStoreCollectionMetadata.java | 50 +- .../store/ElasticsearchStoreMetadataAnalyzer.java | 106 +- .../elasticsearch/utils/AuthenticationType.java | 24 +- .../utils/ElasticsearchConstants.java | 48 +- .../utils/ElasticsearchParameters.java | 464 +++--- .../elasticsearch/GoraElasticsearchTestDriver.java | 64 +- .../mapreduce/ElasticsearchStoreMapReduceTest.java | 58 +- .../store/TestElasticsearchStore.java | 265 ++-- 14 files changed, 1676 insertions(+), 1659 deletions(-) diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java index d31e64f..9c6704f 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java @@ -24,50 +24,50 @@ import java.util.Map; */ public class ElasticsearchMapping { - private String indexName; - private Map<String, Field> fields; + private String indexName; + private Map<String, Field> fields; - /** - * Empty constructor for the ElasticsearchMapping class. - */ - public ElasticsearchMapping() { - fields = new HashMap<>(); - } + /** + * Empty constructor for the ElasticsearchMapping class. + */ + public ElasticsearchMapping() { + fields = new HashMap<>(); + } - /** - * Returns the name of Elasticsearch index linked to the mapping. - * - * @return Index's name - */ - public String getIndexName() { - return indexName; - } + /** + * Returns the name of Elasticsearch index linked to the mapping. + * + * @return Index's name + */ + public String getIndexName() { + return indexName; + } - /** - * Sets the index name of the Elasticsearch mapping. - * - * @param indexName Index's name - */ - public void setIndexName(String indexName) { - this.indexName = indexName; - } + /** + * Sets the index name of the Elasticsearch mapping. + * + * @param indexName Index's name + */ + public void setIndexName(String indexName) { + this.indexName = indexName; + } - /** - * Returns a map with all mapped fields. - * - * @return Map containing mapped fields - */ - public Map<String, Field> getFields() { - return fields; - } + /** + * Returns a map with all mapped fields. + * + * @return Map containing mapped fields + */ + public Map<String, Field> getFields() { + return fields; + } - /** - * Add a new field to the mapped fields. - * - * @param classFieldName Field name in the persisted class - * @param field Mapped field from Elasticsearch index - */ - public void addField(String classFieldName, Field field) { - fields.put(classFieldName, field); - } + /** + * Add a new field to the mapped fields. + * + * @param classFieldName Field name in the persisted class + * @param field Mapped field from Elasticsearch index + */ + public void addField(String classFieldName, Field field) { + fields.put(classFieldName, field); + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java index 30dba7b..31ad474 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java @@ -45,166 +45,166 @@ import java.util.Locale; */ public class ElasticsearchMappingBuilder<K, T extends PersistentBase> { - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class); - - /** - * XSD validation file for the XML mapping. - */ - private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd"; - - // Index description - static final String ATT_NAME = "name"; - - static final String ATT_TYPE = "type"; - - // Class description - static final String TAG_CLASS = "class"; - - static final String ATT_KEYCLASS = "keyClass"; - - static final String ATT_INDEX = "index"; - - static final String TAG_FIELD = "field"; - - static final String ATT_DOCFIELD = "docfield"; - - static final String ATT_SCALINGFACTOR = "scalingFactor"; - - /** - * Mapping instance being built. - */ - private ElasticsearchMapping elasticsearchMapping; - - private final ElasticsearchStore<K, T> dataStore; - - /** - * Constructor for ElasticsearchMappingBuilder. - * - * @param store ElasticsearchStore instance - */ - public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) { - this.elasticsearchMapping = new ElasticsearchMapping(); - this.dataStore = store; - } - - /** - * Returns the Elasticsearch Mapping being built. - * - * @return Elasticsearch Mapping instance - */ - public ElasticsearchMapping getElasticsearchMapping() { - return elasticsearchMapping; - } - - /** - * Sets the Elasticsearch Mapping. - * - * @param elasticsearchMapping Elasticsearch Mapping instance - */ - public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) { - this.elasticsearchMapping = elasticsearchMapping; - } - - /** - * Reads Elasticsearch mappings from file. - * - * @param inputStream Mapping input stream - * @param xsdValidation Parameter for enabling XSD validation - */ - public void readMappingFile(InputStream inputStream, boolean xsdValidation) { - try { - SAXBuilder saxBuilder = new SAXBuilder(); - if (inputStream == null) { - LOG.error("The mapping input stream is null!"); - throw new GoraException("The mapping input stream is null!"); - } - - // Convert input stream to a string to use it a few times - String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset()); - - // XSD validation for XML file - if (xsdValidation) { - Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); - Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI) - .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE))); - schema.newValidator().validate(xmlSource); - LOG.info("Mapping file is valid."); - } - - Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); - if (document == null) { - LOG.error("The mapping document is null!"); - throw new GoraException("The mapping document is null!"); - } - - Element root = document.getRootElement(); - // Extract class descriptions - @SuppressWarnings("unchecked") - List<Element> classElements = root.getChildren(TAG_CLASS); - for (Element classElement : classElements) { - final Class<T> persistentClass = dataStore.getPersistentClass(); - final Class<K> keyClass = dataStore.getKeyClass(); - if (haveKeyClass(keyClass, classElement) - && havePersistentClass(persistentClass, classElement)) { - loadPersistentClass(classElement, persistentClass); - break; - } - } - } catch (IOException | JDOMException | ConfigurationException | SAXException ex) { - throw new RuntimeException(ex); + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class); + + /** + * XSD validation file for the XML mapping. + */ + private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd"; + + // Index description + static final String ATT_NAME = "name"; + + static final String ATT_TYPE = "type"; + + // Class description + static final String TAG_CLASS = "class"; + + static final String ATT_KEYCLASS = "keyClass"; + + static final String ATT_INDEX = "index"; + + static final String TAG_FIELD = "field"; + + static final String ATT_DOCFIELD = "docfield"; + + static final String ATT_SCALINGFACTOR = "scalingFactor"; + + /** + * Mapping instance being built. + */ + private ElasticsearchMapping elasticsearchMapping; + + private final ElasticsearchStore<K, T> dataStore; + + /** + * Constructor for ElasticsearchMappingBuilder. + * + * @param store ElasticsearchStore instance + */ + public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) { + this.elasticsearchMapping = new ElasticsearchMapping(); + this.dataStore = store; + } + + /** + * Returns the Elasticsearch Mapping being built. + * + * @return Elasticsearch Mapping instance + */ + public ElasticsearchMapping getElasticsearchMapping() { + return elasticsearchMapping; + } + + /** + * Sets the Elasticsearch Mapping. + * + * @param elasticsearchMapping Elasticsearch Mapping instance + */ + public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) { + this.elasticsearchMapping = elasticsearchMapping; + } + + /** + * Reads Elasticsearch mappings from file. + * + * @param inputStream Mapping input stream + * @param xsdValidation Parameter for enabling XSD validation + */ + public void readMappingFile(InputStream inputStream, boolean xsdValidation) { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + if (inputStream == null) { + LOG.error("The mapping input stream is null!"); + throw new GoraException("The mapping input stream is null!"); + } + + // Convert input stream to a string to use it a few times + String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset()); + + // XSD validation for XML file + if (xsdValidation) { + Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); + Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI) + .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE))); + schema.newValidator().validate(xmlSource); + LOG.info("Mapping file is valid."); + } + + Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset())); + if (document == null) { + LOG.error("The mapping document is null!"); + throw new GoraException("The mapping document is null!"); + } + + Element root = document.getRootElement(); + // Extract class descriptions + @SuppressWarnings("unchecked") + List<Element> classElements = root.getChildren(TAG_CLASS); + for (Element classElement : classElements) { + final Class<T> persistentClass = dataStore.getPersistentClass(); + final Class<K> keyClass = dataStore.getKeyClass(); + if (haveKeyClass(keyClass, classElement) + && havePersistentClass(persistentClass, classElement)) { + loadPersistentClass(classElement, persistentClass); + break; } - LOG.info("Gora Elasticsearch mapping file was read successfully."); - } - - private boolean haveKeyClass(final Class<K> keyClass, - final Element classElement) { - return classElement.getAttributeValue(ATT_KEYCLASS).equals( - keyClass.getName()); - } - - private boolean havePersistentClass(final Class<T> persistentClass, - final Element classElement) { - return classElement.getAttributeValue(ATT_NAME).equals( - persistentClass.getName()); + } + } catch (IOException | JDOMException | ConfigurationException | SAXException ex) { + throw new RuntimeException(ex); } - - /** - * Handle the XML parsing of the class definition. - * - * @param classElement the XML node containing the class definition - */ - protected void loadPersistentClass(Element classElement, - Class<T> pPersistentClass) { - - String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX); - String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass); - + LOG.info("Gora Elasticsearch mapping file was read successfully."); + } + + private boolean haveKeyClass(final Class<K> keyClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_KEYCLASS).equals( + keyClass.getName()); + } + + private boolean havePersistentClass(final Class<T> persistentClass, + final Element classElement) { + return classElement.getAttributeValue(ATT_NAME).equals( + persistentClass.getName()); + } + + /** + * Handle the XML parsing of the class definition. + * + * @param classElement the XML node containing the class definition + */ + protected void loadPersistentClass(Element classElement, + Class<T> pPersistentClass) { + + String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX); + String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass); + + elasticsearchMapping.setIndexName(indexName); + // docNameFromMapping could be null here + if (!indexName.equals(indexNameFromMapping)) { + ElasticsearchStore.LOG + .info("Keyclass and nameclass match, but mismatching index names. " + + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.", + indexNameFromMapping, indexName); + if (indexNameFromMapping != null) { elasticsearchMapping.setIndexName(indexName); - // docNameFromMapping could be null here - if (!indexName.equals(indexNameFromMapping)) { - ElasticsearchStore.LOG - .info("Keyclass and nameclass match, but mismatching index names. " - + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.", - indexNameFromMapping, indexName); - if (indexNameFromMapping != null) { - elasticsearchMapping.setIndexName(indexName); - } - } + } + } - // Process fields declaration - @SuppressWarnings("unchecked") - List<Element> fields = classElement.getChildren(TAG_FIELD); - for (Element fieldElement : fields) { - String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault()); - Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName)); - Field field; - if (fieldType.getType() == Field.DataType.SCALED_FLOAT) { - int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR)); - field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor)); - } else { - field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType); - } - elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field); - } + // Process fields declaration + @SuppressWarnings("unchecked") + List<Element> fields = classElement.getChildren(TAG_FIELD); + for (Element fieldElement : fields) { + String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault()); + Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName)); + Field field; + if (fieldType.getType() == Field.DataType.SCALED_FLOAT) { + int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR)); + field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor)); + } else { + field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType); + } + elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field); } + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java index 73016cb..e5021fb 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java @@ -24,177 +24,175 @@ import java.util.StringJoiner; */ public class Field { - private String name; - private FieldType dataType; + private String name; + private FieldType dataType; + + /** + * Constructor for Field. + * + * @param name Field's name + * @param dataType Field's data type + */ + public Field(String name, FieldType dataType) { + this.name = name; + this.dataType = dataType; + } + + /** + * Returns the field's name. + * + * @return Field's name + */ + public String getName() { + return name; + } + + /** + * Sets the field's name. + * + * @param name Field's name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Returns the field's data-type. + * + * @return Field's data-type + */ + public FieldType getDataType() { + return dataType; + } + + /** + * Sets the field's data-type. + * + * @param dataType Field's data-type + */ + public void setDataType(FieldType dataType) { + this.dataType = dataType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Field field = (Field) o; + return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType); + } + + @Override + public int hashCode() { + return Objects.hash(name, dataType); + } + + @Override + public String toString() { + return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]") + .add("name='" + name + "'") + .add("dataType=" + dataType) + .toString(); + } + + /** + * Elasticsearch supported data-type enumeration. For a more detailed list of data + * types supported by Elasticsearch refer to + * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html + */ + public enum DataType { + BINARY, + BOOLEAN, + KEYWORD, + CONSTANT_KEYWORD, + WILDCARD, + LONG, + INTEGER, + SHORT, + BYTE, + DOUBLE, + FLOAT, + HALF_FLOAT, + SCALED_FLOAT, + OBJECT, + FLATTENED, + NESTED, + TEXT, + COMPLETION, + SEARCH_AS_YOU_TYPE, + TOKEN_COUNT + } + + public static class FieldType { + + private DataType type; + + // Parameter for scaled_float type. + private int scalingFactor; /** - * Constructor for Field. + * Constructor for FieldType. * - * @param name Field's name - * @param dataType Field's data type + * @param type Elasticsearch data type */ - public Field(String name, FieldType dataType) { - this.name = name; - this.dataType = dataType; + public FieldType(DataType type) { + this.type = type; } /** - * Returns the field's name. + * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type + * with scaling factor parameter. * - * @return Field's name + * @param scalingFactor scaled_float field's scaling factor */ - public String getName() { - return name; + public FieldType(int scalingFactor) { + this.type = DataType.SCALED_FLOAT; + this.scalingFactor = scalingFactor; } /** - * Sets the field's name. - * - * @param name Field's name + * @return Elasticsearch data type */ - public void setName(String name) { - this.name = name; + public DataType getType() { + return type; } /** - * Returns the field's data-type. + * @param type Elasticsearch data type + */ + public void setType(DataType type) { + this.type = type; + } + + /** + * Returns the scaling factor of scaled_float type. * - * @return Field's data-type + * @return scaled_float field's scaling factor */ - public FieldType getDataType() { - return dataType; + public int getScalingFactor() { + return scalingFactor; } /** - * Sets the field's data-type. + * Sets the scaling factor of scaled_float type. * - * @param dataType Field's data-type + * @param scalingFactor scaled_float field's scaling factor */ - public void setDataType(FieldType dataType) { - this.dataType = dataType; + public void setScalingFactor(int scalingFactor) { + this.scalingFactor = scalingFactor; } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Field field = (Field) o; - return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldType fieldType = (FieldType) o; + return scalingFactor == fieldType.scalingFactor && type == fieldType.type; } @Override public int hashCode() { - return Objects.hash(name, dataType); - } - - @Override - public String toString() { - return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]") - .add("name='" + name + "'") - .add("dataType=" + dataType) - .toString(); - } - - /** - * Elasticsearch supported data-type enumeration. For a more detailed list of data - * types supported by Elasticsearch refer to - * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html - */ - public enum DataType { - BINARY, - BOOLEAN, - KEYWORD, - CONSTANT_KEYWORD, - WILDCARD, - LONG, - INTEGER, - SHORT, - BYTE, - DOUBLE, - FLOAT, - HALF_FLOAT, - SCALED_FLOAT, - OBJECT, - FLATTENED, - NESTED, - TEXT, - COMPLETION, - SEARCH_AS_YOU_TYPE, - TOKEN_COUNT - } - - public static class FieldType { - - private DataType type; - - // Parameter for scaled_float type. - private int scalingFactor; - - /** - * Constructor for FieldType. - * - * @param type Elasticsearch data type - */ - public FieldType(DataType type) { - this.type = type; - } - - /** - * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type - * with scaling factor parameter. - * - * @param scalingFactor scaled_float field's scaling factor - */ - public FieldType(int scalingFactor) { - this.type = DataType.SCALED_FLOAT; - this.scalingFactor = scalingFactor; - } - - /** - * - * @return Elasticsearch data type - */ - public DataType getType() { - return type; - } - - /** - * - * @param type Elasticsearch data type - */ - public void setType(DataType type) { - this.type = type; - } - - /** - * Returns the scaling factor of scaled_float type. - * - * @return scaled_float field's scaling factor - */ - public int getScalingFactor() { - return scalingFactor; - } - - /** - * Sets the scaling factor of scaled_float type. - * - * @param scalingFactor scaled_float field's scaling factor - */ - public void setScalingFactor(int scalingFactor) { - this.scalingFactor = scalingFactor; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FieldType fieldType = (FieldType) o; - return scalingFactor == fieldType.scalingFactor && type == fieldType.type; - } - - @Override - public int hashCode() { - return Objects.hash(type, scalingFactor); - } + return Objects.hash(type, scalingFactor); } + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java index f491006..214380e 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java @@ -26,16 +26,16 @@ import org.apache.gora.store.DataStore; */ public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> { - /** - * Constructor for the query. - * - * @param dataStore data store used - */ - public ElasticsearchQuery(DataStore<K, T> dataStore) { - super(dataStore); - } + /** + * Constructor for the query. + * + * @param dataStore data store used + */ + public ElasticsearchQuery(DataStore<K, T> dataStore) { + super(dataStore); + } - public ElasticsearchQuery() { - super(null); - } + public ElasticsearchQuery() { + super(null); + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java index e7f8180..01d5d8e 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java @@ -29,44 +29,44 @@ import java.util.List; */ public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> { - /** - * List of resulting persistent objects. - */ - private List<T> persistentObjects; + /** + * List of resulting persistent objects. + */ + private List<T> persistentObjects; - /** - * List of resulting objects keys. - */ - private List<K> persistentKeys; + /** + * List of resulting objects keys. + */ + private List<K> persistentKeys; - public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) { - super(dataStore, query); - this.persistentKeys = persistentKeys; - this.persistentObjects = persistentObjects; - } - - @Override - public float getProgress() { - if (persistentObjects.size() == 0) { - return 1; - } + public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) { + super(dataStore, query); + this.persistentKeys = persistentKeys; + this.persistentObjects = persistentObjects; + } - return offset / (float) persistentObjects.size(); + @Override + public float getProgress() { + if (persistentObjects.size() == 0) { + return 1; } - @Override - public int size() { - return persistentObjects.size(); - } + return offset / (float) persistentObjects.size(); + } - @Override - protected boolean nextInner() { - if ((int) offset == persistentObjects.size()) { - return false; - } + @Override + public int size() { + return persistentObjects.size(); + } - persistent = persistentObjects.get((int) offset); - key = persistentKeys.get((int) offset); - return persistent != null; + @Override + protected boolean nextInner() { + if ((int) offset == persistentObjects.size()) { + return false; } + + persistent = persistentObjects.get((int) offset); + key = persistentKeys.get((int) offset); + return persistent != null; + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java index a82de7f..610f0fd 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java @@ -82,7 +82,15 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; /** * Implementation of a Apache Elasticsearch data store to be used by Apache Gora. @@ -92,773 +100,773 @@ import java.util.*; */ public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { - public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class); - private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml"; - public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file"; - private static final String XML_MAPPING_DEFINITION = "gora.mapping"; - public static final String XSD_VALIDATION = "gora.xsd_validation"; - - /** - * Elasticsearch client - */ - private RestHighLevelClient client; - - /** - * Mapping definition for Elasticsearch - */ - private ElasticsearchMapping elasticsearchMapping; - - @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { - try { - LOG.debug("Initializing Elasticsearch store"); - ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); - super.initialize(keyClass, persistentClass, properties); - ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this); - InputStream mappingStream; - if (properties.containsKey(XML_MAPPING_DEFINITION)) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION)); - } - mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null); - } else { - mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); - } - String xsdValidation = properties.getProperty(XSD_VALIDATION, "false"); - builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation)); - elasticsearchMapping = builder.getElasticsearchMapping(); - client = createClient(parameters); - LOG.info("Elasticsearch store was successfully initialized."); - } catch (Exception ex) { - LOG.error("Error while initializing Elasticsearch store", ex); - throw new GoraException(ex); - } + public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class); + private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml"; + public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file"; + private static final String XML_MAPPING_DEFINITION = "gora.mapping"; + public static final String XSD_VALIDATION = "gora.xsd_validation"; + + /** + * Elasticsearch client + */ + private RestHighLevelClient client; + + /** + * Mapping definition for Elasticsearch + */ + private ElasticsearchMapping elasticsearchMapping; + + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + try { + LOG.debug("Initializing Elasticsearch store"); + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); + super.initialize(keyClass, persistentClass, properties); + ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this); + InputStream mappingStream; + if (properties.containsKey(XML_MAPPING_DEFINITION)) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION)); + } + mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null); + } else { + mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); + } + String xsdValidation = properties.getProperty(XSD_VALIDATION, "false"); + builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation)); + elasticsearchMapping = builder.getElasticsearchMapping(); + client = createClient(parameters); + LOG.info("Elasticsearch store was successfully initialized."); + } catch (Exception ex) { + LOG.error("Error while initializing Elasticsearch store", ex); + throw new GoraException(ex); } - - public static RestHighLevelClient createClient(ElasticsearchParameters parameters) { - RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort())); - - // Choosing the authentication method. - switch (parameters.getAuthenticationType()) { - case BASIC: - if (parameters.getUsername() != null && parameters.getPassword() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword())); - clientBuilder.setHttpClientConfigCallback( - httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } else { - throw new IllegalArgumentException("Missing username or password for BASIC authentication."); - } - break; - case TOKEN: - if (parameters.getAuthorizationToken() != null) { - Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", - parameters.getAuthorizationToken())}; - clientBuilder.setDefaultHeaders(defaultHeaders); - } else { - throw new IllegalArgumentException("Missing authorization token for TOKEN authentication."); - } - break; - case APIKEY: - if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) { - String apiKeyAuth = Base64.getEncoder() - .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret()) - .getBytes(StandardCharsets.UTF_8)); - Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)}; - clientBuilder.setDefaultHeaders(defaultHeaders); - } else { - throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication."); - } - break; - } - - if (parameters.getConnectTimeout() != 0) { - clientBuilder.setRequestConfigCallback(requestConfigBuilder -> - requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout())); - } - - if (parameters.getSocketTimeout() != 0) { - clientBuilder.setRequestConfigCallback(requestConfigBuilder -> - requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout())); - } - - if (parameters.getIoThreadCount() != 0) { - clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> - httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom() - .setIoThreadCount(parameters.getIoThreadCount()).build())); + } + + public static RestHighLevelClient createClient(ElasticsearchParameters parameters) { + RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort())); + + // Choosing the authentication method. + switch (parameters.getAuthenticationType()) { + case BASIC: + if (parameters.getUsername() != null && parameters.getPassword() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword())); + clientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + } else { + throw new IllegalArgumentException("Missing username or password for BASIC authentication."); + } + break; + case TOKEN: + if (parameters.getAuthorizationToken() != null) { + Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", + parameters.getAuthorizationToken())}; + clientBuilder.setDefaultHeaders(defaultHeaders); + } else { + throw new IllegalArgumentException("Missing authorization token for TOKEN authentication."); + } + break; + case APIKEY: + if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) { + String apiKeyAuth = Base64.getEncoder() + .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret()) + .getBytes(StandardCharsets.UTF_8)); + Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)}; + clientBuilder.setDefaultHeaders(defaultHeaders); + } else { + throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication."); } - return new RestHighLevelClient(clientBuilder); + break; } - public ElasticsearchMapping getMapping() { - return elasticsearchMapping; + if (parameters.getConnectTimeout() != 0) { + clientBuilder.setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout())); } - @Override - public String getSchemaName() { - return elasticsearchMapping.getIndexName(); + if (parameters.getSocketTimeout() != 0) { + clientBuilder.setRequestConfigCallback(requestConfigBuilder -> + requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout())); } - @Override - public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) { - return super.getSchemaName(mappingSchemaName, persistentClass); + if (parameters.getIoThreadCount() != 0) { + clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom() + .setIoThreadCount(parameters.getIoThreadCount()).build())); } - - @Override - public void createSchema() throws GoraException { - CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName()); - Map<String, Object> properties = new HashMap<>(); - for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) { - Map<String, Object> fieldType = new HashMap<>(); - fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT)); - if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) { - fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor()); - } - properties.put(entry.getKey(), fieldType); - } - // Special field for range query - properties.put("gora_id", new HashMap<String, Object>() {{ - put("type", "keyword"); - }}); - Map<String, Object> mapping = new HashMap<>(); - mapping.put("properties", properties); - request.mapping(mapping); - try { - if (!client.indices().exists( - new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { - client.indices().create(request, RequestOptions.DEFAULT); - } - } catch (IOException ex) { - throw new GoraException(ex); - } + return new RestHighLevelClient(clientBuilder); + } + + public ElasticsearchMapping getMapping() { + return elasticsearchMapping; + } + + @Override + public String getSchemaName() { + return elasticsearchMapping.getIndexName(); + } + + @Override + public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public void createSchema() throws GoraException { + CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName()); + Map<String, Object> properties = new HashMap<>(); + for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) { + Map<String, Object> fieldType = new HashMap<>(); + fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT)); + if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) { + fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor()); + } + properties.put(entry.getKey(), fieldType); } - - @Override - public void deleteSchema() throws GoraException { - DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName()); - try { - if (client.indices().exists( - new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { - client.indices().delete(request, RequestOptions.DEFAULT); - } - } catch (IOException ex) { - throw new GoraException(ex); - } + // Special field for range query + properties.put("gora_id", new HashMap<String, Object>() {{ + put("type", "keyword"); + }}); + Map<String, Object> mapping = new HashMap<>(); + mapping.put("properties", properties); + request.mapping(mapping); + try { + if (!client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { + client.indices().create(request, RequestOptions.DEFAULT); + } + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public boolean schemaExists() throws GoraException { - try { - return client.indices().exists( - new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } + } + + @Override + public void deleteSchema() throws GoraException { + DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName()); + try { + if (client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) { + client.indices().delete(request, RequestOptions.DEFAULT); + } + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public boolean exists(K key) throws GoraException { - GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); - getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_"); - try { - return client.exists(getRequest, RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } + } + + @Override + public boolean schemaExists() throws GoraException { + try { + return client.indices().exists( + new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public T get(K key, String[] fields) throws GoraException { - String[] requestedFields = getFieldsToQuery(fields); - List<String> documentFields = new ArrayList<>(); - for (String requestedField : requestedFields) { - documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName()); - } - try { - // Prepare the Elasticsearch request - GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); - GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); - if (getResponse.isExists()) { - Map<String, Object> sourceMap = getResponse.getSourceAsMap(); - - // Map of field's name and its value from the Document - Map<String, Object> fieldsAndValues = new HashMap<>(); - for (String field : documentFields) { - fieldsAndValues.put(field, sourceMap.get(field)); - } - - // Build the corresponding persistent - return newInstance(fieldsAndValues, requestedFields); - } else { - return null; - } - } catch (IOException ex) { - throw new GoraException(ex); - } + } + + @Override + public boolean exists(K key) throws GoraException { + GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); + getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_"); + try { + return client.exists(getRequest, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public void put(K key, T obj) throws GoraException { - if (obj.isDirty()) { - Schema schemaObj = obj.getSchema(); - List<Schema.Field> fields = schemaObj.getFields(); - Map<String, Object> jsonMap = new HashMap<>(); - for (Schema.Field field : fields) { - Field mappedField = elasticsearchMapping.getFields().get(field.name()); - if (mappedField != null) { - Object fieldValue = obj.get(field.pos()); - if (fieldValue != null) { - Schema fieldSchema = field.schema(); - Object serializedObj = serializeFieldValue(fieldSchema, fieldValue); - jsonMap.put(mappedField.getName(), serializedObj); - } - } - } - // Special field for range query - jsonMap.put("gora_id", key); - // Prepare the Elasticsearch request - IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap); - try { - client.index(request, RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } - } else { - LOG.info("Ignored putting object {} in the store as it is neither " - + "new, neither dirty.", new Object[]{obj}); - } + } + + @Override + public T get(K key, String[] fields) throws GoraException { + String[] requestedFields = getFieldsToQuery(fields); + List<String> documentFields = new ArrayList<>(); + for (String requestedField : requestedFields) { + documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName()); } - - @Override - public boolean delete(K key) throws GoraException { - DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key); - try { - DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); - return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND; - } catch (IOException ex) { - throw new GoraException(ex); - } + try { + // Prepare the Elasticsearch request + GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key); + GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse.isExists()) { + Map<String, Object> sourceMap = getResponse.getSourceAsMap(); + + // Map of field's name and its value from the Document + Map<String, Object> fieldsAndValues = new HashMap<>(); + for (String field : documentFields) { + fieldsAndValues.put(field, sourceMap.get(field)); + } + + // Build the corresponding persistent + return newInstance(fieldsAndValues, requestedFields); + } else { + return null; + } + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public long deleteByQuery(Query<K, T> query) throws GoraException { - try { - BulkByScrollResponse bulkResponse; - if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) { - UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName()); - QueryBuilder matchDocumentsWithinRange = QueryBuilders - .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); - updateRequest.setQuery(matchDocumentsWithinRange); - - // Create a script for deleting fields - StringBuilder toDelete = new StringBuilder(); - String[] fieldsToDelete = query.getFields(); - for (String field : fieldsToDelete) { - String elasticsearchField = elasticsearchMapping.getFields().get(field).getName(); - toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField)); - } - //toDelete.deleteCharAt(toDelete.length() - 1); - updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap())); - bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT); - return bulkResponse.getUpdated(); - } else { - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName()); - QueryBuilder matchDocumentsWithinRange = QueryBuilders - .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); - deleteRequest.setQuery(matchDocumentsWithinRange); - bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); - return bulkResponse.getDeleted(); - } - } catch (IOException ex) { - throw new GoraException(ex); - } + } + + @Override + public void put(K key, T obj) throws GoraException { + if (obj.isDirty()) { + Schema schemaObj = obj.getSchema(); + List<Schema.Field> fields = schemaObj.getFields(); + Map<String, Object> jsonMap = new HashMap<>(); + for (Schema.Field field : fields) { + Field mappedField = elasticsearchMapping.getFields().get(field.name()); + if (mappedField != null) { + Object fieldValue = obj.get(field.pos()); + if (fieldValue != null) { + Schema fieldSchema = field.schema(); + Object serializedObj = serializeFieldValue(fieldSchema, fieldValue); + jsonMap.put(mappedField.getName(), serializedObj); + } + } + } + // Special field for range query + jsonMap.put("gora_id", key); + // Prepare the Elasticsearch request + IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap); + try { + client.index(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + } else { + LOG.info("Ignored putting object {} in the store as it is neither " + + "new, neither dirty.", new Object[]{obj}); + } + } + + @Override + public boolean delete(K key) throws GoraException { + DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key); + try { + DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); + return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND; + } catch (IOException ex) { + throw new GoraException(ex); } + } + + @Override + public long deleteByQuery(Query<K, T> query) throws GoraException { + try { + BulkByScrollResponse bulkResponse; + if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) { + UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName()); + QueryBuilder matchDocumentsWithinRange = QueryBuilders + .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); + updateRequest.setQuery(matchDocumentsWithinRange); + + // Create a script for deleting fields + StringBuilder toDelete = new StringBuilder(); + String[] fieldsToDelete = query.getFields(); + for (String field : fieldsToDelete) { + String elasticsearchField = elasticsearchMapping.getFields().get(field).getName(); + toDelete.append(String.format(Locale.getDefault(), "ctx._source.remove('%s');", elasticsearchField)); + } + //toDelete.deleteCharAt(toDelete.length() - 1); + updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap())); + bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT); + return bulkResponse.getUpdated(); + } else { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName()); + QueryBuilder matchDocumentsWithinRange = QueryBuilders + .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey()); + deleteRequest.setQuery(matchDocumentsWithinRange); + bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); + return bulkResponse.getDeleted(); + } + } catch (IOException ex) { + throw new GoraException(ex); + } + } - @Override - public Result<K, T> execute(Query<K, T> query) throws GoraException { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + @Override + public Result<K, T> execute(Query<K, T> query) throws GoraException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - // Set the query result limit - int size = (int) query.getLimit(); - if (size != -1) { - searchSourceBuilder.size(size); - } - try { - // Build the actual Elasticsearch range query - QueryBuilder rangeQueryBuilder = QueryBuilders - .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey()); - searchSourceBuilder.query(rangeQueryBuilder); - SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName()); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); - - String[] avroFields = getFieldsToQuery(query.getFields()); - - SearchHits hits = searchResponse.getHits(); - SearchHit[] searchHits = hits.getHits(); - List<K> hitId = new ArrayList<>(); - - // Check filter - Filter<K, T> queryFilter = query.getFilter(); - List<T> filteredObjects = new ArrayList<>(); - for (SearchHit hit : searchHits) { - Map<String, Object> sourceAsMap = hit.getSourceAsMap(); - if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) { - filteredObjects.add(newInstance(sourceAsMap, avroFields)); - hitId.add((K) hit.getId()); - } - } - return new ElasticsearchResult<>(this, query, hitId, filteredObjects); - } catch (IOException ex) { - throw new GoraException(ex); - } + // Set the query result limit + int size = (int) query.getLimit(); + if (size != -1) { + searchSourceBuilder.size(size); } - - @Override - public Query<K, T> newQuery() { - ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this); - query.setFields(getFieldsToQuery(null)); - return query; + try { + // Build the actual Elasticsearch range query + QueryBuilder rangeQueryBuilder = QueryBuilders + .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey()); + searchSourceBuilder.query(rangeQueryBuilder); + SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName()); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + + String[] avroFields = getFieldsToQuery(query.getFields()); + + SearchHits hits = searchResponse.getHits(); + SearchHit[] searchHits = hits.getHits(); + List<K> hitId = new ArrayList<>(); + + // Check filter + Filter<K, T> queryFilter = query.getFilter(); + List<T> filteredObjects = new ArrayList<>(); + for (SearchHit hit : searchHits) { + Map<String, Object> sourceAsMap = hit.getSourceAsMap(); + if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) { + filteredObjects.add(newInstance(sourceAsMap, avroFields)); + hitId.add((K) hit.getId()); + } + } + return new ElasticsearchResult<>(this, query, hitId, filteredObjects); + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { - List<PartitionQuery<K, T>> partitions = new ArrayList<>(); - PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( - query); - partitionQuery.setConf(getConf()); - partitions.add(partitionQuery); - return partitions; + } + + @Override + public Query<K, T> newQuery() { + ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this); + query.setFields(getFieldsToQuery(null)); + return query; + } + + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>( + query); + partitionQuery.setConf(getConf()); + partitions.add(partitionQuery); + return partitions; + } + + @Override + public void flush() throws GoraException { + try { + client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); } - - @Override - public void flush() throws GoraException { - try { - client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); - client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } + } + + @Override + public void close() { + try { + client.close(); + LOG.info("Elasticsearch datastore destroyed successfully."); + } catch (IOException ex) { + LOG.error(ex.getMessage(), ex); } - - @Override - public void close() { - try { - client.close(); - LOG.info("Elasticsearch datastore destroyed successfully."); - } catch (IOException ex) { - LOG.error(ex.getMessage(), ex); - } + } + + /** + * Build a new instance of the persisted class from the Document retrieved from the database. + * + * @param fieldsAndValues Map of field's name and its value from the Document + * that results from the query to the database + * @param requestedFields the list of fields to be mapped to the persistence class instance + * @return a persistence class instance which content was deserialized from the Document + * @throws IOException + */ + public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException { + // Create new empty persistent bean instance + T persistent = newPersistent(); + + requestedFields = getFieldsToQuery(requestedFields); + // Populate each field + for (String objField : requestedFields) { + Schema.Field field = fieldMap.get(objField); + Schema fieldSchema = field.schema(); + String docFieldName = elasticsearchMapping.getFields().get(objField).getName(); + Object fieldValue = fieldsAndValues.get(docFieldName); + + Object result = deserializeFieldValue(field, fieldSchema, fieldValue); + persistent.put(field.pos(), result); } - - /** - * Build a new instance of the persisted class from the Document retrieved from the database. - * - * @param fieldsAndValues Map of field's name and its value from the Document - * that results from the query to the database - * @param requestedFields the list of fields to be mapped to the persistence class instance - * @return a persistence class instance which content was deserialized from the Document - * @throws IOException - */ - public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException { - // Create new empty persistent bean instance - T persistent = newPersistent(); - - requestedFields = getFieldsToQuery(requestedFields); - // Populate each field - for (String objField : requestedFields) { - Schema.Field field = fieldMap.get(objField); - Schema fieldSchema = field.schema(); - String docFieldName = elasticsearchMapping.getFields().get(objField).getName(); - Object fieldValue = fieldsAndValues.get(docFieldName); - - Object result = deserializeFieldValue(field, fieldSchema, fieldValue); - persistent.put(field.pos(), result); - } - persistent.clearDirty(); - return persistent; - } - - /** - * Deserialize an Elasticsearch object to a persistent Avro object. - * - * @param avroField persistent Avro class field to which the value will be deserialized - * @param avroFieldSchema schema for the persistent Avro class field - * @param elasticsearchValue Elasticsearch field value to be deserialized - * @return deserialized Avro object from the Elasticsearch object - * @throws GoraException when the given Elasticsearch value cannot be deserialized - */ - private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema, - Object elasticsearchValue) throws GoraException { - Object fieldValue; - switch (avroFieldSchema.getType()) { - case MAP: - fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue); - break; - case RECORD: - fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue); - break; - case ARRAY: - fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue); - break; - case BOOLEAN: - fieldValue = Boolean.parseBoolean(elasticsearchValue.toString()); - break; - case BYTES: - fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString())); - break; - case FIXED: - case NULL: - fieldValue = null; - break; - case UNION: - fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue); - break; - case DOUBLE: - fieldValue = Double.parseDouble(elasticsearchValue.toString()); - break; - case ENUM: - fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString()); - break; - case FLOAT: - fieldValue = Float.parseFloat(elasticsearchValue.toString()); - break; - case INT: - fieldValue = Integer.parseInt(elasticsearchValue.toString()); - break; - case LONG: - fieldValue = Long.parseLong(elasticsearchValue.toString()); - break; - case STRING: - fieldValue = new Utf8(elasticsearchValue.toString()); - break; - default: - fieldValue = elasticsearchValue; - } - return fieldValue; - } - - /** - * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes - * that can safely be written into Avro persistent object. - * - * @param avroField persistent Avro class field to which the value will be deserialized - * @param avroFieldSchema schema for the persistent Avro class field - * @param elasticsearchValue Elasticsearch field value to be deserialized - * @return deserialized Avro List from the given Elasticsearch value - * @throws GoraException when one of the underlying values cannot be deserialized - */ - private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema, - Object elasticsearchValue) throws GoraException { - List<Object> list = new ArrayList<>(); - if (elasticsearchValue != null) { - for (Object item : (List<Object>) elasticsearchValue) { - Object result = deserializeFieldValue(avroField, avroFieldSchema, item); - list.add(result); - } - } - return new DirtyListWrapper<>(list); - } - - /** - * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes - * that can safely be written into Avro persistent object. - * - * @param avroField persistent Avro class field to which the value will be deserialized - * @param avroFieldSchema schema for the persistent Avro class field - * @param elasticsearchMap Elasticsearch Map value to be deserialized - * @return deserialized Avro Map from the given Elasticsearch Map value - * @throws GoraException when one of the underlying values cannot be deserialized - */ - private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema, - Map<String, Object> elasticsearchMap) throws GoraException { - Map<Utf8, Object> deserializedMap = new HashMap<>(); - if (elasticsearchMap != null) { - for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) { - String mapKey = entry.getKey(); - Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue()); - deserializedMap.put(new Utf8(mapKey), mapValue); - } - } - return new DirtyMapWrapper<>(deserializedMap); - } - - /** - * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes - * that can safely be written into Avro persistent object. - * - * @param avroFieldSchema schema for the persistent Avro class field - * @param elasticsearchRecord Elasticsearch Record value to be deserialized - * @return deserialized Avro Object from the given Elasticsearch Record value - * @throws GoraException when one of the underlying values cannot be deserialized - */ - private Object fromElasticsearchRecord(Schema avroFieldSchema, - Map<String, Object> elasticsearchRecord) throws GoraException { - Class<?> clazz; - try { - clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName()); - } catch (ClassNotFoundException ex) { - throw new GoraException(ex); - } - PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); - for (Schema.Field recField : avroFieldSchema.getFields()) { - Schema innerSchema = recField.schema(); - Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null)); - record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName()))); - } - return record; - } - - /** - * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes - * that can safely be written into Avro persistent object. - * - * @param avroField persistent Avro class field to which the value will be deserialized - * @param avroFieldSchema schema for the persistent Avro class field - * @param elasticsearchUnion Elasticsearch Union value to be deserialized - * @return deserialized Avro Object from the given Elasticsearch Union value - * @throws GoraException when one of the underlying values cannot be deserialized - */ - private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException { - Object deserializedUnion; - Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); - Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); - if (avroFieldSchema.getTypes().size() == 2 && - (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && - !type0.equals(type1)) { - int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); - Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); - deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion); - } else if (avroFieldSchema.getTypes().size() == 3) { - Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); - if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && - (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { - if (elasticsearchUnion == null) { - deserializedUnion = null; - } else if (elasticsearchUnion instanceof String) { - throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null."); - } else { - int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); - Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); - deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion); - } - } else { - throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); - } + persistent.clearDirty(); + return persistent; + } + + /** + * Deserialize an Elasticsearch object to a persistent Avro object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchValue Elasticsearch field value to be deserialized + * @return deserialized Avro object from the Elasticsearch object + * @throws GoraException when the given Elasticsearch value cannot be deserialized + */ + private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema, + Object elasticsearchValue) throws GoraException { + Object fieldValue; + switch (avroFieldSchema.getType()) { + case MAP: + fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue); + break; + case RECORD: + fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue); + break; + case ARRAY: + fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue); + break; + case BOOLEAN: + fieldValue = Boolean.parseBoolean(elasticsearchValue.toString()); + break; + case BYTES: + fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString())); + break; + case FIXED: + case NULL: + fieldValue = null; + break; + case UNION: + fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue); + break; + case DOUBLE: + fieldValue = Double.parseDouble(elasticsearchValue.toString()); + break; + case ENUM: + fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString()); + break; + case FLOAT: + fieldValue = Float.parseFloat(elasticsearchValue.toString()); + break; + case INT: + fieldValue = Integer.parseInt(elasticsearchValue.toString()); + break; + case LONG: + fieldValue = Long.parseLong(elasticsearchValue.toString()); + break; + case STRING: + fieldValue = new Utf8(elasticsearchValue.toString()); + break; + default: + fieldValue = elasticsearchValue; + } + return fieldValue; + } + + /** + * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchValue Elasticsearch field value to be deserialized + * @return deserialized Avro List from the given Elasticsearch value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema, + Object elasticsearchValue) throws GoraException { + List<Object> list = new ArrayList<>(); + if (elasticsearchValue != null) { + for (Object item : (List<Object>) elasticsearchValue) { + Object result = deserializeFieldValue(avroField, avroFieldSchema, item); + list.add(result); + } + } + return new DirtyListWrapper<>(list); + } + + /** + * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchMap Elasticsearch Map value to be deserialized + * @return deserialized Avro Map from the given Elasticsearch Map value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema, + Map<String, Object> elasticsearchMap) throws GoraException { + Map<Utf8, Object> deserializedMap = new HashMap<>(); + if (elasticsearchMap != null) { + for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) { + String mapKey = entry.getKey(); + Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue()); + deserializedMap.put(new Utf8(mapKey), mapValue); + } + } + return new DirtyMapWrapper<>(deserializedMap); + } + + /** + * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchRecord Elasticsearch Record value to be deserialized + * @return deserialized Avro Object from the given Elasticsearch Record value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchRecord(Schema avroFieldSchema, + Map<String, Object> elasticsearchRecord) throws GoraException { + Class<?> clazz; + try { + clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName()); + } catch (ClassNotFoundException ex) { + throw new GoraException(ex); + } + PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent(); + for (Schema.Field recField : avroFieldSchema.getFields()) { + Schema innerSchema = recField.schema(); + Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null)); + record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName()))); + } + return record; + } + + /** + * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes + * that can safely be written into Avro persistent object. + * + * @param avroField persistent Avro class field to which the value will be deserialized + * @param avroFieldSchema schema for the persistent Avro class field + * @param elasticsearchUnion Elasticsearch Union value to be deserialized + * @return deserialized Avro Object from the given Elasticsearch Union value + * @throws GoraException when one of the underlying values cannot be deserialized + */ + private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException { + Object deserializedUnion; + Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); + if (avroFieldSchema.getTypes().size() == 2 && + (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && + !type0.equals(type1)) { + int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion); + } else if (avroFieldSchema.getTypes().size() == 3) { + Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); + if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && + (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { + if (elasticsearchUnion == null) { + deserializedUnion = null; + } else if (elasticsearchUnion instanceof String) { + throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null."); } else { - throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); - } - return deserializedUnion; - } - - /** - * Serialize a persistent Avro object as used in Gora generated classes to - * an object that can be written into Elasticsearch. - * - * @param avroFieldSchema schema for the persistent Avro class field - * @param avroFieldValue persistent Avro field value to be serialized - * @return serialized field value - * @throws GoraException when the given Avro object cannot be serialized - */ - private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException { - Object output = avroFieldValue; - switch (avroFieldSchema.getType()) { - case ARRAY: - output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType()); - break; - case MAP: - output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType()); - break; - case RECORD: - output = recordToElasticsearch(avroFieldValue, avroFieldSchema); - break; - case BYTES: - output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array()); - break; - case UNION: - output = unionToElasticsearch(avroFieldValue, avroFieldSchema); - break; - case BOOLEAN: - case DOUBLE: - case ENUM: - case FLOAT: - case INT: - case LONG: - case STRING: - output = avroFieldValue.toString(); - break; - case FIXED: - break; - case NULL: - output = null; - break; - } - return output; - } - - /** - * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a - * List that can safely be written into Elasticsearch. - * - * @param collection the collection to be serialized - * @param avroFieldSchema field schema for the underlying type - * @return a List version of the collection that can be safely written into Elasticsearch - * @throws GoraException when one of the underlying values cannot be serialized - */ - private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException { - List<Object> list = new ArrayList<>(); - for (Object item : collection) { - Object result = serializeFieldValue(avroFieldSchema, item); - list.add(result); - } - return list; - } - - /** - * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a - * map that can safely be written into Elasticsearch. - * - * @param map the map to be serialized - * @param avroFieldSchema field schema for the underlying type - * @return a Map version of the Java map that can be safely written into Elasticsearch - * @throws GoraException when one of the underlying values cannot be serialized - */ - private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException { - Map<CharSequence, Object> serializedMap = new HashMap<>(); - for (Map.Entry<CharSequence, ?> entry : map.entrySet()) { - String mapKey = entry.getKey().toString(); - Object mapValue = entry.getValue(); - Object result = serializeFieldValue(avroFieldSchema, mapValue); - serializedMap.put(mapKey, result); - } - return serializedMap; - } - - /** - * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a - * record that can safely be written into Elasticsearch. - * - * @param record the object to be serialized - * @param avroFieldSchema field schema for the underlying type - * @return a record version of the Java object that can be safely written into Elasticsearch - * @throws GoraException when one of the underlying values cannot be serialized - */ - private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException { - Map<CharSequence, Object> serializedRecord = new HashMap<>(); - for (Schema.Field member : avroFieldSchema.getFields()) { - Object innerValue = ((PersistentBase) record).get(member.pos()); - serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue)); - } - return serializedRecord; - } - - /** - * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a - * object that can safely be written into Elasticsearch. - * - * @param union the object to be serialized - * @param avroFieldSchema field schema for the underlying type - * @return a object version of the Java object that can be safely written into Elasticsearch - * @throws GoraException when one of the underlying values cannot be serialized - */ - private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException { - Object serializedUnion; - Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); - Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); - if (avroFieldSchema.getTypes().size() == 2 && - (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && - !type0.equals(type1)) { - int schemaPos = getUnionSchema(union, avroFieldSchema); - Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); - serializedUnion = serializeFieldValue(unionSchema, union); - } else if (avroFieldSchema.getTypes().size() == 3) { - Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); - if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && - (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { - if (union == null) { - serializedUnion = null; - } else if (union instanceof String) { - throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type."); - } else { - int schemaPos = getUnionSchema(union, avroFieldSchema); - Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); - serializedUnion = recordToElasticsearch(union, unionSchema); - } - } else { - throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); - } + int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + return deserializedUnion; + } + + /** + * Serialize a persistent Avro object as used in Gora generated classes to + * an object that can be written into Elasticsearch. + * + * @param avroFieldSchema schema for the persistent Avro class field + * @param avroFieldValue persistent Avro field value to be serialized + * @return serialized field value + * @throws GoraException when the given Avro object cannot be serialized + */ + private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException { + Object output = avroFieldValue; + switch (avroFieldSchema.getType()) { + case ARRAY: + output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType()); + break; + case MAP: + output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType()); + break; + case RECORD: + output = recordToElasticsearch(avroFieldValue, avroFieldSchema); + break; + case BYTES: + output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array()); + break; + case UNION: + output = unionToElasticsearch(avroFieldValue, avroFieldSchema); + break; + case BOOLEAN: + case DOUBLE: + case ENUM: + case FLOAT: + case INT: + case LONG: + case STRING: + output = avroFieldValue.toString(); + break; + case FIXED: + break; + case NULL: + output = null; + break; + } + return output; + } + + /** + * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a + * List that can safely be written into Elasticsearch. + * + * @param collection the collection to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a List version of the collection that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException { + List<Object> list = new ArrayList<>(); + for (Object item : collection) { + Object result = serializeFieldValue(avroFieldSchema, item); + list.add(result); + } + return list; + } + + /** + * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a + * map that can safely be written into Elasticsearch. + * + * @param map the map to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a Map version of the Java map that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException { + Map<CharSequence, Object> serializedMap = new HashMap<>(); + for (Map.Entry<CharSequence, ?> entry : map.entrySet()) { + String mapKey = entry.getKey().toString(); + Object mapValue = entry.getValue(); + Object result = serializeFieldValue(avroFieldSchema, mapValue); + serializedMap.put(mapKey, result); + } + return serializedMap; + } + + /** + * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a + * record that can safely be written into Elasticsearch. + * + * @param record the object to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a record version of the Java object that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException { + Map<CharSequence, Object> serializedRecord = new HashMap<>(); + for (Schema.Field member : avroFieldSchema.getFields()) { + Object innerValue = ((PersistentBase) record).get(member.pos()); + serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue)); + } + return serializedRecord; + } + + /** + * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a + * object that can safely be written into Elasticsearch. + * + * @param union the object to be serialized + * @param avroFieldSchema field schema for the underlying type + * @return a object version of the Java object that can be safely written into Elasticsearch + * @throws GoraException when one of the underlying values cannot be serialized + */ + private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException { + Object serializedUnion; + Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType(); + Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType(); + if (avroFieldSchema.getTypes().size() == 2 && + (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) && + !type0.equals(type1)) { + int schemaPos = getUnionSchema(union, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + serializedUnion = serializeFieldValue(unionSchema, union); + } else if (avroFieldSchema.getTypes().size() == 3) { + Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType(); + if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) && + (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) { + if (union == null) { + serializedUnion = null; + } else if (union instanceof String) { + throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type."); } else { - throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); - } - return serializedUnion; - } - - /** - * Method to retrieve the corresponding schema type index of a particular - * object having UNION schema. As UNION type can have one or more types and at - * a given instance, it holds an object of only one type of the defined types, - * this method is used to figure out the corresponding instance's schema type - * index. - * - * @param instanceValue value that the object holds - * @param unionSchema union schema containing all of the data types - * @return the unionSchemaPosition corresponding schema position - */ - private int getUnionSchema(Object instanceValue, Schema unionSchema) { - int unionSchemaPos = 0; - for (Schema currentSchema : unionSchema.getTypes()) { - Schema.Type schemaType = currentSchema.getType(); - if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) { - return unionSchemaPos; - } - if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) { - return unionSchemaPos; - } - if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) { - return unionSchemaPos; - } - if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) { - return unionSchemaPos; - } - if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) { - return unionSchemaPos; - } - if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) { - return unionSchemaPos; - } - if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) { - return unionSchemaPos; - } - if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) { - return unionSchemaPos; - } - if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) { - return unionSchemaPos; - } - if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) { - return unionSchemaPos; - } - if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) { - return unionSchemaPos; - } - if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) { - return unionSchemaPos; - } - if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) { - return unionSchemaPos; - } - if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) { - return unionSchemaPos; - } - if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) { - return unionSchemaPos; - } - if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) { - return unionSchemaPos; - } - unionSchemaPos++; - } - return 0; + int schemaPos = getUnionSchema(union, avroFieldSchema); + Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos); + serializedUnion = recordToElasticsearch(union, unionSchema); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + } else { + throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null."); + } + return serializedUnion; + } + + /** + * Method to retrieve the corresponding schema type index of a particular + * object having UNION schema. As UNION type can have one or more types and at + * a given instance, it holds an object of only one type of the defined types, + * this method is used to figure out the corresponding instance's schema type + * index. + * + * @param instanceValue value that the object holds + * @param unionSchema union schema containing all of the data types + * @return the unionSchemaPosition corresponding schema position + */ + private int getUnionSchema(Object instanceValue, Schema unionSchema) { + int unionSchemaPos = 0; + for (Schema currentSchema : unionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) { + return unionSchemaPos; + } + if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) { + return unionSchemaPos; + } + if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) { + return unionSchemaPos; + } + if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) { + return unionSchemaPos; + } + if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) { + return unionSchemaPos; + } + if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) { + return unionSchemaPos; + } + if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) { + return unionSchemaPos; + } + if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) { + return unionSchemaPos; + } + unionSchemaPos++; } + return 0; + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java index 01466ee..c253e87 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java @@ -25,29 +25,29 @@ import java.util.List; */ public class ElasticsearchStoreCollectionMetadata { - /** - * Collection document keys present in a given collection at ElasticsearchStore. - */ - private List<String> documentKeys = new ArrayList<>(); - - /** - * Collection document types present in a given collection at ElasticsearchStore. - */ - private List<String> documentTypes = new ArrayList<>(); - - public List<String> getDocumentKeys() { - return documentKeys; - } - - public void setDocumentKeys(List<String> documentKeys) { - this.documentKeys = documentKeys; - } - - public List<String> getDocumentTypes() { - return documentTypes; - } - - public void setDocumentTypes(List<String> documentTypes) { - this.documentTypes = documentTypes; - } + /** + * Collection document keys present in a given collection at ElasticsearchStore. + */ + private List<String> documentKeys = new ArrayList<>(); + + /** + * Collection document types present in a given collection at ElasticsearchStore. + */ + private List<String> documentTypes = new ArrayList<>(); + + public List<String> getDocumentKeys() { + return documentKeys; + } + + public void setDocumentKeys(List<String> documentKeys) { + this.documentKeys = documentKeys; + } + + public List<String> getDocumentTypes() { + return documentTypes; + } + + public void setDocumentTypes(List<String> documentTypes) { + this.documentTypes = documentTypes; + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java index 6806aa3..465c268 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java @@ -35,68 +35,68 @@ import java.util.Map; public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer { - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class); + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class); - private RestHighLevelClient elasticsearchClient; + private RestHighLevelClient elasticsearchClient; - @Override - public void initialize() throws GoraException { - ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); - elasticsearchClient = ElasticsearchStore.createClient(parameters); - } + @Override + public void initialize() throws GoraException { + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf()); + elasticsearchClient = ElasticsearchStore.createClient(parameters); + } - @Override - public String getType() { - return "ELASTICSEARCH"; - } + @Override + public String getType() { + return "ELASTICSEARCH"; + } - @Override - public List<String> getTablesNames() throws GoraException { - GetIndexRequest request = new GetIndexRequest("*"); - GetIndexResponse response; - try { - response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } - if (response == null) { - LOG.error("Could not find indices."); - throw new GoraException("Could not find indices."); - } - return Arrays.asList(response.getIndices()); + @Override + public List<String> getTablesNames() throws GoraException { + GetIndexRequest request = new GetIndexRequest("*"); + GetIndexResponse response; + try { + response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); } + if (response == null) { + LOG.error("Could not find indices."); + throw new GoraException("Could not find indices."); + } + return Arrays.asList(response.getIndices()); + } - @Override - public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException { - GetIndexRequest request = new GetIndexRequest(tableName); - GetIndexResponse getIndexResponse; - try { - getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); - } catch (IOException ex) { - throw new GoraException(ex); - } - MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName); - Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties"); - - List<String> documentTypes = new ArrayList<>(); - List<String> documentKeys = new ArrayList<>(); + @Override + public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException { + GetIndexRequest request = new GetIndexRequest(tableName); + GetIndexResponse getIndexResponse; + try { + getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT); + } catch (IOException ex) { + throw new GoraException(ex); + } + MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName); + Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties"); - for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) { - Map<String, Object> subEntry = (Map<String, Object>) entry.getValue(); - documentTypes.add((String) subEntry.get("type")); - documentKeys.add(entry.getKey()); - } + List<String> documentTypes = new ArrayList<>(); + List<String> documentKeys = new ArrayList<>(); - ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata(); - collectionMetadata.setDocumentKeys(documentKeys); - collectionMetadata.setDocumentTypes(documentTypes); - return collectionMetadata; + for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) { + Map<String, Object> subEntry = (Map<String, Object>) entry.getValue(); + documentTypes.add((String) subEntry.get("type")); + documentKeys.add(entry.getKey()); } - @Override - public void close() throws IOException { - if (elasticsearchClient != null) { - this.elasticsearchClient.close(); - } + ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata(); + collectionMetadata.setDocumentKeys(documentKeys); + collectionMetadata.setDocumentTypes(documentTypes); + return collectionMetadata; + } + + @Override + public void close() throws IOException { + if (elasticsearchClient != null) { + this.elasticsearchClient.close(); } + } } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java index dbbd45b..236b306 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java @@ -20,16 +20,16 @@ package org.apache.gora.elasticsearch.utils; * Authentication type to connect to the Elasticsearch server. */ public enum AuthenticationType { - /** - * Basic authentication requires to provide a username and password. - */ - BASIC, - /** - * Token authentication requires to provide an Elasticsearch access token. - */ - TOKEN, - /** - * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret. - */ - APIKEY + /** + * Basic authentication requires to provide a username and password. + */ + BASIC, + /** + * Token authentication requires to provide an Elasticsearch access token. + */ + TOKEN, + /** + * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret. + */ + APIKEY } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java index fa9bdc2..4f0ca4a 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java @@ -20,30 +20,30 @@ package org.apache.gora.elasticsearch.utils; * Constants file for Elasticsearch. */ public class ElasticsearchConstants { - /** - * Property indicating if the hadoop configuration has priority or not. - */ - public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration"; + /** + * Property indicating if the hadoop configuration has priority or not. + */ + public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration"; - /** - * Default configurations for Elasticsearch. - */ - public static final String DEFAULT_HOST = "localhost"; - public static final int DEFAULT_PORT = 9200; + /** + * Default configurations for Elasticsearch. + */ + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 9200; - /** - * List of keys used in the configuration file of Elasticsearch. - */ - public static final String PROP_HOST = "gora.datastore.elasticsearch.host"; - public static final String PROP_PORT = "gora.datastore.elasticsearch.port"; - public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme"; - public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType"; - public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username"; - public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password"; - public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken"; - public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId"; - public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret"; - public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout"; - public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout"; - public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount"; + /** + * List of keys used in the configuration file of Elasticsearch. + */ + public static final String PROP_HOST = "gora.datastore.elasticsearch.host"; + public static final String PROP_PORT = "gora.datastore.elasticsearch.port"; + public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme"; + public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType"; + public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username"; + public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password"; + public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken"; + public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId"; + public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret"; + public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout"; + public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout"; + public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount"; } diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java index 4b0cef4..187b02a 100644 --- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java +++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java @@ -1,14 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,249 +25,248 @@ import java.util.Properties; */ public class ElasticsearchParameters { - /** - * Elasticsearch server host. - */ - private String host; - - /** - * Elasticsearch server port. - */ - private int port; - - /** - * Elasticsearch server scheme. - * Optional. If not provided, defaults to http. - */ - private String scheme; - - /** - * Authentication type to connect to the server. - * Can be BASIC, TOKEN or APIKEY. - */ - private AuthenticationType authenticationType; - - /** - * Username to use for server authentication. - * Required for BASIC authentication to connect to the server. - */ - private String username; - - /** - * Password to use for server authentication. - * Required for BASIC authentication to connect to the server. - */ - private String password; - - /** - * Authorization token to use for server authentication. - * Required for TOKEN authentication to connect to the server. - */ - private String authorizationToken; - - /** - * API Key ID to use for server authentication. - * Required for APIKEY authentication to connect to the server. - */ - private String apiKeyId; - - /** - * API Key Secret to use for server authentication. - * Required for APIKEY authentication to connect to the server. - */ - private String apiKeySecret; - - /** - * Timeout in milliseconds used for establishing the connection to the server. - * Optional. If not provided, defaults to 5000s. - * - */ - private int connectTimeout; - - /** - * Timeout in milliseconds used for waiting for data – after establishing the connection to the server. - * Optional. If not provided, defaults to 60000s. - */ - private int socketTimeout; - - /** - * Number of worker threads used by the connection manager. - * Optional. If not provided, defaults to 1. - */ - private int ioThreadCount; - - public ElasticsearchParameters(String host, int port) { - this.host = host; - this.port = port; + /** + * Elasticsearch server host. + */ + private String host; + + /** + * Elasticsearch server port. + */ + private int port; + + /** + * Elasticsearch server scheme. + * Optional. If not provided, defaults to http. + */ + private String scheme; + + /** + * Authentication type to connect to the server. + * Can be BASIC, TOKEN or APIKEY. + */ + private AuthenticationType authenticationType; + + /** + * Username to use for server authentication. + * Required for BASIC authentication to connect to the server. + */ + private String username; + + /** + * Password to use for server authentication. + * Required for BASIC authentication to connect to the server. + */ + private String password; + + /** + * Authorization token to use for server authentication. + * Required for TOKEN authentication to connect to the server. + */ + private String authorizationToken; + + /** + * API Key ID to use for server authentication. + * Required for APIKEY authentication to connect to the server. + */ + private String apiKeyId; + + /** + * API Key Secret to use for server authentication. + * Required for APIKEY authentication to connect to the server. + */ + private String apiKeySecret; + + /** + * Timeout in milliseconds used for establishing the connection to the server. + * Optional. If not provided, defaults to 5000s. + */ + private int connectTimeout; + + /** + * Timeout in milliseconds used for waiting for data – after establishing the connection to the server. + * Optional. If not provided, defaults to 60000s. + */ + private int socketTimeout; + + /** + * Number of worker threads used by the connection manager. + * Optional. If not provided, defaults to 1. + */ + private int ioThreadCount; + + public ElasticsearchParameters(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getScheme() { + return scheme; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + public AuthenticationType getAuthenticationType() { + return authenticationType; + } + + public void setAuthenticationType(AuthenticationType authenticationType) { + this.authenticationType = authenticationType; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getAuthorizationToken() { + return authorizationToken; + } + + public void setAuthorizationToken(String authorizationToken) { + this.authorizationToken = authorizationToken; + } + + public String getApiKeyId() { + return apiKeyId; + } + + public void setApiKeyId(String apiKeyId) { + this.apiKeyId = apiKeyId; + } + + public String getApiKeySecret() { + return apiKeySecret; + } + + public void setApiKeySecret(String apiKeySecret) { + this.apiKeySecret = apiKeySecret; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getIoThreadCount() { + return ioThreadCount; + } + + public void setIoThreadCount(int ioThreadCount) { + this.ioThreadCount = ioThreadCount; + } + + /** + * Reads Elasticsearch parameters from a properties list. + * + * @param properties Properties list + * @return Elasticsearch parameters instance + */ + public static ElasticsearchParameters load(Properties properties, Configuration conf) { + ElasticsearchParameters elasticsearchParameters; + + if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) { + elasticsearchParameters = new ElasticsearchParameters( + conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), + conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT)); + } else { + elasticsearchParameters = new ElasticsearchParameters( + properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), + Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT, + String.valueOf(ElasticsearchConstants.DEFAULT_PORT)))); } - public String getHost() { - return host; + String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME); + if (schemeProperty != null) { + elasticsearchParameters.setScheme(schemeProperty); } - public void setHost(String host) { - this.host = host; + AuthenticationType authenticationTypeProperty = + AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE)); + if (authenticationTypeProperty != null) { + elasticsearchParameters.setAuthenticationType(authenticationTypeProperty); } - public int getPort() { - return port; + String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME); + if (usernameProperty != null) { + elasticsearchParameters.setUsername(usernameProperty); } - public void setPort(int port) { - this.port = port; + String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD); + if (passwordProperty != null) { + elasticsearchParameters.setPassword(passwordProperty); } - public String getScheme() { - return scheme; + String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN); + if (authorizationTokenProperty != null) { + elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty); } - public void setScheme(String scheme) { - this.scheme = scheme; + String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID); + if (apiKeyIdProperty != null) { + elasticsearchParameters.setApiKeyId(apiKeyIdProperty); } - public AuthenticationType getAuthenticationType() { - return authenticationType; + String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET); + if (apiKeySecretProperty != null) { + elasticsearchParameters.setApiKeySecret(apiKeySecretProperty); } - public void setAuthenticationType(AuthenticationType authenticationType) { - this.authenticationType = authenticationType; + String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT); + if (connectTimeoutProperty != null) { + elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty)); } - public String getUsername() { - return username; + String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT); + if (socketTimeoutProperty != null) { + elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty)); } - public void setUsername(String username) { - this.username = username; + String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT); + if (ioThreadCountProperty != null) { + elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty)); } - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getAuthorizationToken() { - return authorizationToken; - } - - public void setAuthorizationToken(String authorizationToken) { - this.authorizationToken = authorizationToken; - } - - public String getApiKeyId() { - return apiKeyId; - } - - public void setApiKeyId(String apiKeyId) { - this.apiKeyId = apiKeyId; - } - - public String getApiKeySecret() { - return apiKeySecret; - } - - public void setApiKeySecret(String apiKeySecret) { - this.apiKeySecret = apiKeySecret; - } - - public int getConnectTimeout() { - return connectTimeout; - } - - public void setConnectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - } - - public int getSocketTimeout() { - return socketTimeout; - } - - public void setSocketTimeout(int socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public int getIoThreadCount() { - return ioThreadCount; - } - - public void setIoThreadCount(int ioThreadCount) { - this.ioThreadCount = ioThreadCount; - } - - /** - * Reads Elasticsearch parameters from a properties list. - * - * @param properties Properties list - * @return Elasticsearch parameters instance - */ - public static ElasticsearchParameters load(Properties properties, Configuration conf) { - ElasticsearchParameters elasticsearchParameters; - - if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) { - elasticsearchParameters = new ElasticsearchParameters( - conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), - conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT)); - } else { - elasticsearchParameters = new ElasticsearchParameters( - properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST), - Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT, - String.valueOf(ElasticsearchConstants.DEFAULT_PORT)))); - } - - String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME); - if (schemeProperty != null) { - elasticsearchParameters.setScheme(schemeProperty); - } - - AuthenticationType authenticationTypeProperty = - AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE)); - if (authenticationTypeProperty != null) { - elasticsearchParameters.setAuthenticationType(authenticationTypeProperty); - } - - String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME); - if (usernameProperty != null) { - elasticsearchParameters.setUsername(usernameProperty); - } - - String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD); - if (passwordProperty != null) { - elasticsearchParameters.setPassword(passwordProperty); - } - - String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN); - if (authorizationTokenProperty != null) { - elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty); - } - - String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID); - if (apiKeyIdProperty != null) { - elasticsearchParameters.setApiKeyId(apiKeyIdProperty); - } - - String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET); - if (apiKeySecretProperty != null) { - elasticsearchParameters.setApiKeySecret(apiKeySecretProperty); - } - - String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT); - if (connectTimeoutProperty != null) { - elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty)); - } - - String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT); - if (socketTimeoutProperty != null) { - elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty)); - } - - String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT); - if (ioThreadCountProperty != null) { - elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty)); - } - - return elasticsearchParameters; - } + return elasticsearchParameters; + } } diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java index ef17dbc..6d4d4c3 100644 --- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java @@ -32,40 +32,40 @@ import java.util.Properties; */ public class GoraElasticsearchTestDriver extends GoraTestDriver { - private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1"; - private ElasticsearchContainer elasticsearchContainer; + private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1"; + private ElasticsearchContainer elasticsearchContainer; - /** - * Constructor for this class. - */ - public GoraElasticsearchTestDriver() { - super(ElasticsearchStore.class); - Properties properties = DataStoreFactory.createProps(); - elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE) - .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD)) - .withEnv("xpack.security.enabled", "true"); - } + /** + * Constructor for this class. + */ + public GoraElasticsearchTestDriver() { + super(ElasticsearchStore.class); + Properties properties = DataStoreFactory.createProps(); + elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE) + .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD)) + .withEnv("xpack.security.enabled", "true"); + } - /** - * Initiate the Elasticsearch server on the default port. - */ - @Override - public void setUpClass() throws Exception { - elasticsearchContainer.start(); - log.info("Setting up Elasticsearch test driver"); + /** + * Initiate the Elasticsearch server on the default port. + */ + @Override + public void setUpClass() throws Exception { + elasticsearchContainer.start(); + log.info("Setting up Elasticsearch test driver"); - int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT); - String host = elasticsearchContainer.getContainerIpAddress(); - conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port)); - conf.set(ElasticsearchConstants.PROP_HOST, host); - } + int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT); + String host = elasticsearchContainer.getContainerIpAddress(); + conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port)); + conf.set(ElasticsearchConstants.PROP_HOST, host); + } - /** - * Tear the server down. - */ - @Override - public void tearDownClass() throws Exception { - elasticsearchContainer.close(); - log.info("Tearing down Elasticsearch test driver"); - } + /** + * Tear the server down. + */ + @Override + public void tearDownClass() throws Exception { + elasticsearchContainer.close(); + log.info("Tearing down Elasticsearch test driver"); + } } diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java index d5aa9e6..c05c441 100644 --- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java @@ -19,10 +19,12 @@ package org.apache.gora.elasticsearch.mapreduce; import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.mapreduce.DataStoreMapReduceTestBase; +import org.apache.gora.mapreduce.MapReduceTestUtils; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.junit.After; import org.junit.Before; +import org.junit.Test; import java.io.IOException; @@ -31,29 +33,35 @@ import java.io.IOException; */ public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase { - private GoraElasticsearchTestDriver driver; - - public ElasticsearchStoreMapReduceTest() throws IOException { - super(); - driver = new GoraElasticsearchTestDriver(); - } - - @Override - @Before - public void setUp() throws Exception { - driver.setUpClass(); - super.setUp(); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - driver.tearDownClass(); - } - - @Override - protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { - return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); - } + private GoraElasticsearchTestDriver driver; + + public ElasticsearchStoreMapReduceTest() throws IOException { + super(); + driver = new GoraElasticsearchTestDriver(); + } + + @Override + @Before + public void setUp() throws Exception { + driver.setUpClass(); + super.setUp(); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + driver.tearDownClass(); + } + + @Override + protected DataStore<String, WebPage> createWebPageDataStore() throws IOException { + return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration()); + } + + @Test + @Override + public void testCountQuery() throws Exception { + MapReduceTestUtils.testCountQuery(createWebPageDataStore(), driver.getConfiguration()); + } } diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java index 7c95593..fb7fd0d 100644 --- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java +++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java @@ -34,140 +34,145 @@ import org.junit.Ignore; import org.junit.Test; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + /** * Test case for ElasticsearchStore. */ public class TestElasticsearchStore extends DataStoreTestBase { - static { - setTestDriver(new GoraElasticsearchTestDriver()); - } - - @Test - public void testInitialize() throws GoraException { - log.info("test method: testInitialize"); - - ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping(); - - Map<String, Field> fields = new HashMap<String, Field>() {{ - put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT))); - put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG))); - put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT))); - put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT))); - put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER))); - put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT))); - put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT))); - }}; - - Assert.assertEquals("frontier", employeeStore.getSchemaName()); - Assert.assertEquals("frontier", mapping.getIndexName()); - Assert.assertEquals(fields, mapping.getFields()); - } - - @Test - public void testLoadElasticsearchParameters() throws IOException { - log.info("test method: testLoadElasticsearchParameters"); - - Properties properties = DataStoreFactory.createProps(); - - ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration()); - - Assert.assertEquals("localhost", parameters.getHost()); - Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType()); - Assert.assertEquals("elastic", parameters.getUsername()); - Assert.assertEquals("password", parameters.getPassword()); - } - - @Test(expected = GoraException.class) - public void testInvalidXmlFile() throws Exception { - log.info("test method: testInvalidXmlFile"); - - Properties properties = DataStoreFactory.createProps(); - properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); - properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true"); - testDriver.createDataStore(String.class, EmployeeInt.class, properties); - } - - @Test - public void testXsdValidationParameter() throws GoraException { - log.info("test method: testXsdValidationParameter"); - - Properties properties = DataStoreFactory.createProps(); - properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); - properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false"); - testDriver.createDataStore(String.class, EmployeeInt.class, properties); - } - - @Test - public void testGetType() throws GoraException, ClassNotFoundException { - Configuration conf = testDriver.getConfiguration(); - DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); - - String actualType = storeMetadataAnalyzer.getType(); - String expectedType = "ELASTICSEARCH"; - Assert.assertEquals(expectedType, actualType); - } - - @Test - public void testGetTablesNames() throws GoraException, ClassNotFoundException { - Configuration conf = testDriver.getConfiguration(); - DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); - - List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames()); - List<String> expectedTablesNames = new ArrayList<String>() { - { - add("frontier"); - add("webpage"); - } - }; - Assert.assertEquals(expectedTablesNames, actualTablesNames); - } - - @Test - public void testGetTableInfo() throws GoraException, ClassNotFoundException { - Configuration conf = testDriver.getConfiguration(); - DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); - - ElasticsearchStoreCollectionMetadata actualCollectionMetadata = - (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier"); - - List<String> expectedDocumentKeys = new ArrayList<String>() { - { - add("name"); - add("dateOfBirth"); - add("ssn"); - add("value"); - add("salary"); - add("boss"); - add("webpage"); - add("gora_id"); - } - }; - - List<String> expectedDocumentTypes = new ArrayList<String>() { - { - add("text"); - add("long"); - add("text"); - add("text"); - add("integer"); - add("object"); - add("object"); - add("keyword"); - } - }; - - Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size()); - Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys())); - - Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size()); - Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes())); - } - - @Ignore("Elasticsearch doesn't support 3 types union field yet") - @Override - public void testGet3UnionField() { - } + static { + setTestDriver(new GoraElasticsearchTestDriver()); + } + + @Test + public void testInitialize() throws GoraException { + log.info("test method: testInitialize"); + + ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping(); + + Map<String, Field> fields = new HashMap<String, Field>() {{ + put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT))); + put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG))); + put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT))); + put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT))); + put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER))); + put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT))); + put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT))); + }}; + + Assert.assertEquals("frontier", employeeStore.getSchemaName()); + Assert.assertEquals("frontier", mapping.getIndexName()); + Assert.assertEquals(fields, mapping.getFields()); + } + + @Test + public void testLoadElasticsearchParameters() throws IOException { + log.info("test method: testLoadElasticsearchParameters"); + + Properties properties = DataStoreFactory.createProps(); + + ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration()); + + Assert.assertEquals("localhost", parameters.getHost()); + Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType()); + Assert.assertEquals("elastic", parameters.getUsername()); + Assert.assertEquals("password", parameters.getPassword()); + } + + @Test(expected = GoraException.class) + public void testInvalidXmlFile() throws Exception { + log.info("test method: testInvalidXmlFile"); + + Properties properties = DataStoreFactory.createProps(); + properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); + properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true"); + testDriver.createDataStore(String.class, EmployeeInt.class, properties); + } + + @Test + public void testXsdValidationParameter() throws GoraException { + log.info("test method: testXsdValidationParameter"); + + Properties properties = DataStoreFactory.createProps(); + properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml"); + properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false"); + testDriver.createDataStore(String.class, EmployeeInt.class, properties); + } + + @Test + public void testGetType() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + String actualType = storeMetadataAnalyzer.getType(); + String expectedType = "ELASTICSEARCH"; + Assert.assertEquals(expectedType, actualType); + } + + @Test + public void testGetTablesNames() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames()); + List<String> expectedTablesNames = new ArrayList<String>() { + { + add("frontier"); + add("webpage"); + } + }; + Assert.assertEquals(expectedTablesNames, actualTablesNames); + } + + @Test + public void testGetTableInfo() throws GoraException, ClassNotFoundException { + Configuration conf = testDriver.getConfiguration(); + DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf); + + ElasticsearchStoreCollectionMetadata actualCollectionMetadata = + (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier"); + + List<String> expectedDocumentKeys = new ArrayList<String>() { + { + add("name"); + add("dateOfBirth"); + add("ssn"); + add("value"); + add("salary"); + add("boss"); + add("webpage"); + add("gora_id"); + } + }; + + List<String> expectedDocumentTypes = new ArrayList<String>() { + { + add("text"); + add("long"); + add("text"); + add("text"); + add("integer"); + add("object"); + add("object"); + add("keyword"); + } + }; + + Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size()); + Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys())); + + Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size()); + Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes())); + } + + @Ignore("Elasticsearch doesn't support 3 types union field yet") + @Override + public void testGet3UnionField() { + } }