METAMODEL-212: Fixed Fixes #67
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/137caf0d Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/137caf0d Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/137caf0d Branch: refs/heads/master Commit: 137caf0d2faab1bc0ad42e972aef01f0282423a2 Parents: f7989b8 Author: Dennis Du Krøger <d...@hp23c.dk> Authored: Fri Nov 27 11:13:57 2015 +0100 Committer: Kasper Sørensen <i.am.kasper.soren...@gmail.com> Committed: Fri Nov 27 11:13:57 2015 +0100 ---------------------------------------------------------------------- .gitignore | 10 +- CHANGES.md | 1 + .../org/apache/metamodel/query/SelectItem.java | 8 +- elasticsearch/common/pom.xml | 50 ++ .../common/ElasticSearchDateConverter.java | 41 ++ .../common/ElasticSearchMetaData.java | 50 ++ .../common/ElasticSearchUtils.java | 260 ++++++++ .../common/ElasticSearchDateConverterTest.java | 36 ++ elasticsearch/native/pom.xml | 62 ++ .../ElasticSearchCreateTableBuilder.java | 65 ++ .../nativeclient/ElasticSearchDataContext.java | 384 ++++++++++++ .../nativeclient/ElasticSearchDataSet.java | 130 ++++ .../ElasticSearchDeleteBuilder.java | 82 +++ .../ElasticSearchDropTableBuilder.java | 103 ++++ .../ElasticSearchInsertBuilder.java | 78 +++ .../ElasticSearchMetaDataParser.java | 81 +++ .../ElasticSearchUpdateCallback.java | 87 +++ .../nativeclient/NativeElasticSearchUtils.java | 66 ++ .../ElasticSearchDataContextTest.java | 606 ++++++++++++++++++ .../ElasticSearchMetaDataParserTest.java | 66 ++ .../nativeclient/ElasticSearchUtilsTest.java | 63 ++ .../utils/EmbeddedElasticsearchServer.java | 71 +++ elasticsearch/pom.xml | 39 +- elasticsearch/rest/pom.xml | 93 +++ .../rest/ElasticSearchRestDataContext.java | 375 +++++++++++ .../elasticsearch/rest/JestClientExecutor.java | 51 ++ .../elasticsearch/rest/JestDeleteScroll.java | 57 ++ .../JestElasticSearchCreateTableBuilder.java | 50 ++ .../rest/JestElasticSearchDataSet.java | 129 ++++ .../rest/JestElasticSearchDeleteBuilder.java | 76 +++ .../rest/JestElasticSearchDropTableBuilder.java | 61 ++ .../rest/JestElasticSearchInsertBuilder.java | 77 +++ .../rest/JestElasticSearchMetaDataParser.java | 75 +++ .../rest/JestElasticSearchUpdateCallback.java | 84 +++ .../rest/JestElasticSearchUtils.java | 76 +++ .../rest/JestElasticSearchDataContextTest.java | 614 +++++++++++++++++++ .../JestElasticSearchMetaDataParserTest.java | 70 +++ .../rest/JestElasticSearchUtilsTest.java | 68 ++ .../rest/utils/EmbeddedElasticsearchServer.java | 72 +++ .../ElasticSearchCreateTableBuilder.java | 142 ----- .../elasticsearch/ElasticSearchDataContext.java | 471 -------------- .../elasticsearch/ElasticSearchDataSet.java | 130 ---- .../ElasticSearchDateConverter.java | 41 -- .../ElasticSearchDeleteBuilder.java | 81 --- .../ElasticSearchDropTableBuilder.java | 103 ---- .../ElasticSearchInsertBuilder.java | 77 --- .../elasticsearch/ElasticSearchMetaData.java | 52 -- .../ElasticSearchMetaDataParser.java | 95 --- .../ElasticSearchUpdateCallback.java | 87 --- .../elasticsearch/ElasticSearchUtils.java | 66 -- .../ElasticSearchDataContextTest.java | 605 ------------------ .../ElasticSearchDateConverterTest.java | 34 - .../ElasticSearchMetaDataParserTest.java | 65 -- .../elasticsearch/ElasticSearchUtilsTest.java | 63 -- .../utils/EmbeddedElasticsearchServer.java | 71 --- full/pom.xml | 15 +- .../apache/metamodel/DataContextFactory.java | 20 +- pom.xml | 2 +- 58 files changed, 4457 insertions(+), 2230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index f4edda6..e1af7b0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,9 @@ -/.project -/.settings -/target -/.idea +.project +.settings/ +.classpath/ +.metadata/ +target/ +/.idea/ *.iml *.ipr *.iws http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/CHANGES.md ---------------------------------------------------------------------- diff --git a/CHANGES.md b/CHANGES.md index cdf244a..47018ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,6 @@ ### Apache MetaModel (work-in-progress) + * [METAMODEL-212] - New module for ElasticSearch via REST client. * [METAMODEL-207] - Ensured the serializability of the SingleLineCsvRow class. * [METAMODEL-211] - Fixed a bug related to lookup by primary key (_id) on MongoDB. * [METAMODEL-15] - Query parser support for table names with space. Delimitters can be double quote or square brackets. http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/core/src/main/java/org/apache/metamodel/query/SelectItem.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/query/SelectItem.java b/core/src/main/java/org/apache/metamodel/query/SelectItem.java index a1e33de..cbbe919 100644 --- a/core/src/main/java/org/apache/metamodel/query/SelectItem.java +++ b/core/src/main/java/org/apache/metamodel/query/SelectItem.java @@ -215,13 +215,19 @@ public class SelectItem extends BaseObject implements QueryItem, Cloneable { * * @return * @deprecated use {@link #getAggregateFunction()} or - * {@link #getScalarFunction()} instead + * {@link #getScalarFunction()} instead, + * or {@link #hasFunction()} to check if a + * function is set at all. */ @Deprecated public FunctionType getFunction() { return _function; } + public boolean hasFunction(){ + return _function != null; + } + public AggregateFunction getAggregateFunction() { if (_function instanceof AggregateFunction) { return (AggregateFunction) _function; http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/common/pom.xml b/elasticsearch/common/pom.xml new file mode 100644 index 0000000..98e723a --- /dev/null +++ b/elasticsearch/common/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>MetaModel-elasticsearch</artifactId> + <groupId>org.apache.metamodel</groupId> + <version>4.4.2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>MetaModel-elasticsearch-common</artifactId> + <name>MetaModel module for ElasticSearch commons</name> + + <dependencies> + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- elasticsearch --> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java new file mode 100644 index 0000000..a6ce656 --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import org.apache.metamodel.util.TimeComparator; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Util class to convert date strings from ElasticSearch to + * proper java Dates. + */ +public final class ElasticSearchDateConverter { + + public static Date tryToConvert(String dateAsString) { + try { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + return dateFormat.parse(dateAsString); + } catch (ParseException e) { + return TimeComparator.toDate(dateAsString); + } + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java new file mode 100644 index 0000000..7c32eb0 --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import org.apache.metamodel.schema.ColumnType; + +/** + * MetaData representation of an ElasticSearch index type. + * + * We will map the elasticsearch fields to columns and their + * types to {@link ColumnType}s. + */ +public class ElasticSearchMetaData { + + private final String[] columnNames; + private final ColumnType[] columnTypes; + + /** + * Constructs a {@link ElasticSearchMetaData}. + * + */ + public ElasticSearchMetaData(String[] names, ColumnType[] types) { + this.columnNames = names; + this.columnTypes = types; + } + + public String[] getColumnNames() { + return columnNames; + } + + public ColumnType[] getColumnTypes() { + return columnTypes; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java new file mode 100644 index 0000000..11d35bd --- /dev/null +++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.query.OperatorType; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.MutableColumn; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.util.CollectionUtils; +import org.elasticsearch.common.base.Strings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.FilterBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticSearchUtils { + public static final String FIELD_ID = "_id"; + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class); + + /** + * Gets a "filter" query which is both 1.x and 2.x compatible. + */ + private static QueryBuilder getFilteredQuery(String prefix, String fieldName) { + // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName)); + // 2.x: itemQueryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName)); + try { + try { + Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class); + method.setAccessible(true); + return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName)); + } catch (NoSuchMethodException e) { + Class<?> clazz = ElasticSearchUtils.class.getClassLoader() + .loadClass("org.elasticsearch.index.query.FilterBuilders"); + Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class); + filterBuilderMethod.setAccessible(true); + Method queryBuildersFilteredQueryMethod = + QueryBuilders.class.getDeclaredMethod("filteredQuery", QueryBuilder.class, FilterBuilder.class); + return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, + filterBuilderMethod.invoke(null, fieldName)); + } + } catch (Exception e) { + logger.error("Failed to resolve/invoke filtering method", e); + throw new IllegalStateException("Failed to resolve filtering method", e); + } + } + + public static QueryBuilder getMissingQuery(String fieldName) { + return getFilteredQuery("missing", fieldName); + } + + public static QueryBuilder getExistsQuery(String fieldName) { + return getFilteredQuery("exists", fieldName); + } + + public static List<Object> getSourceProperties(final MutableTable table) { + if (table.getColumnByName(FIELD_ID) == null) { + final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING) + .setTable(table).setPrimaryKey(true); + table.addColumn(0, idColumn); + } + + final List<Object> sourceProperties = new ArrayList<>(); + + for (Column column : table.getColumns()) { + // each column is defined as a property pair of the form: ("field1", + // "type=string,store=true") + final String columnName = column.getName(); + if (FIELD_ID.equals(columnName)) { + // do nothing - the ID is a client-side construct + continue; + } + sourceProperties.add(columnName); + + String type = getType(column); + if (type == null) { + sourceProperties.add("store=true"); + } else { + sourceProperties.add("type=" + type + ",store=true"); + } + } + + return sourceProperties; + } + + /** + * Determines the best fitting type. For reference of ElasticSearch types, + * see + * + * <pre> + * http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html + * </pre> + * + * + * @param column + * @return + */ + private static String getType(Column column) { + String nativeType = column.getNativeType(); + if (!Strings.isNullOrEmpty(nativeType)) { + return nativeType; + } + + final ColumnType type = column.getType(); + if (type == null) { + throw new IllegalStateException("No column type specified for '" + column.getName() + + "' - cannot build ElasticSearch mapping without type."); + } + + if (type.isLiteral()) { + return "string"; + } else if (type == ColumnType.FLOAT) { + return "float"; + } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) { + return "double"; + } else if (type == ColumnType.SMALLINT) { + return "short"; + } else if (type == ColumnType.TINYINT) { + return "byte"; + } else if (type == ColumnType.INTEGER) { + return "integer"; + } else if (type == ColumnType.DATE || type == ColumnType.TIMESTAMP) { + return "date"; + } else if (type == ColumnType.BINARY || type == ColumnType.VARBINARY) { + return "binary"; + } else if (type == ColumnType.BOOLEAN || type == ColumnType.BIT) { + return "boolean"; + } else if (type == ColumnType.MAP) { + return "object"; + } + + throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '" + + column.getName() + "' - cannot translate to an ElasticSearch type."); + } + + /** + * Creates, if possible, a {@link QueryBuilder} object which can be used to + * push down one or more {@link FilterItem}s to ElasticSearch's backend. + * + * @return a {@link QueryBuilder} if one was produced, or null if the items + * could not be pushed down to an ElasticSearch query + */ + public static QueryBuilder createQueryBuilderForSimpleWhere(List<FilterItem> whereItems, + LogicalOperator logicalOperator) { + if (whereItems.isEmpty()) { + return QueryBuilders.matchAllQuery(); + } + + List<QueryBuilder> children = new ArrayList<>(whereItems.size()); + for (FilterItem item : whereItems) { + final QueryBuilder itemQueryBuilder; + + if (item.isCompoundFilter()) { + final List<FilterItem> childItems = Arrays.asList(item.getChildItems()); + itemQueryBuilder = createQueryBuilderForSimpleWhere(childItems, item.getLogicalOperator()); + if (itemQueryBuilder == null) { + // something was not supported, so we have to forfeit here + // too. + return null; + } + } else { + final Column column = item.getSelectItem().getColumn(); + if (column == null) { + // unsupported type of where item - must have a column + // reference + return null; + } + final String fieldName = column.getName(); + final Object operand = item.getOperand(); + final OperatorType operator = item.getOperator(); + + if (OperatorType.EQUALS_TO.equals(operator)) { + if (operand == null) { + itemQueryBuilder = getMissingQuery(fieldName); + } else { + itemQueryBuilder = QueryBuilders.termQuery(fieldName, operand); + } + } else if (OperatorType.DIFFERENT_FROM.equals(operator)) { + if (operand == null) { + itemQueryBuilder = getExistsQuery(fieldName); + } else { + itemQueryBuilder = QueryBuilders.boolQuery().mustNot( + QueryBuilders.termQuery(fieldName, operand)); + } + } else if (OperatorType.IN.equals(operator)) { + final List<?> operands = CollectionUtils.toList(operand); + itemQueryBuilder = QueryBuilders.termsQuery(fieldName, operands); + } else { + // not (yet) support operator types + return null; + } + } + + children.add(itemQueryBuilder); + } + + // just one where item - just return the child query builder + if (children.size() == 1) { + return children.get(0); + } + + // build a bool query + final BoolQueryBuilder result = QueryBuilders.boolQuery(); + for (QueryBuilder child : children) { + switch (logicalOperator) { + case AND: + result.must(child); + case OR: + result.should(child); + } + } + + return result; + } + + public static ColumnType getColumnTypeFromElasticSearchType(final String metaDataFieldType) { + final ColumnType columnType; + if (metaDataFieldType.startsWith("date")) { + columnType = ColumnType.DATE; + } else if (metaDataFieldType.equals("long")) { + columnType = ColumnType.BIGINT; + } else if (metaDataFieldType.equals("string")) { + columnType = ColumnType.STRING; + } else if (metaDataFieldType.equals("float")) { + columnType = ColumnType.FLOAT; + } else if (metaDataFieldType.equals("boolean")) { + columnType = ColumnType.BOOLEAN; + } else if (metaDataFieldType.equals("double")) { + columnType = ColumnType.DOUBLE; + } else { + columnType = ColumnType.STRING; + } + return columnType; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java new file mode 100644 index 0000000..49ef1bb --- /dev/null +++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.common; + +import junit.framework.TestCase; + +import java.util.Date; + +import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter; + +public class ElasticSearchDateConverterTest extends TestCase { + + public void testConvertDateOptionalTime() throws Exception { + String dateToConvert = "2013-01-04T15:55:51.217+01:00"; + Date date = ElasticSearchDateConverter.tryToConvert(dateToConvert); + + assertNotNull(date); + assertTrue(date.toString().startsWith("Fri Jan 04")); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml new file mode 100644 index 0000000..7b117aa --- /dev/null +++ b/elasticsearch/native/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>MetaModel-elasticsearch</artifactId> + <groupId>org.apache.metamodel</groupId> + <version>4.4.2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>MetaModel-elasticsearch-native</artifactId> + <name>MetaModel module for ElasticSearch via native (Node or Transport) client</name> + <dependencies> + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-elasticsearch-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <!-- elasticsearch --> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- test --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java new file mode 100644 index 0000000..5d6701c --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.List; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.create.AbstractTableCreationBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.client.IndicesAdminClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchUpdateCallback> { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchCreateTableBuilder.class); + + public ElasticSearchCreateTableBuilder(ElasticSearchUpdateCallback updateCallback, Schema schema, String name) { + super(updateCallback, schema, name); + } + + @Override + public Table execute() throws MetaModelException { + final MutableTable table = getTable(); + final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table); + + final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext(); + final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices(); + final String indexName = dataContext.getIndexName(); + + final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin) + .setIndices(indexName).setType(table.getName()); + requestBuilder.setSource(sourceProperties.toArray()); + final PutMappingResponse result = requestBuilder.execute().actionGet(); + + logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged()); + + final MutableSchema schema = (MutableSchema) getSchema(); + schema.addTable(table); + return table; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java new file mode 100644 index 0000000..8232394 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.apache.metamodel.DataContext; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.QueryPostprocessDataContext; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.MutableColumn; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.util.SimpleTableDef; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.hppc.ObjectLookupContainer; +import org.elasticsearch.common.hppc.cursors.ObjectCursor; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DataContext implementation for ElasticSearch analytics engine. + * + * ElasticSearch has a data storage structure hierarchy that briefly goes like + * this: + * <ul> + * <li>Index</li> + * <li>Document type (short: Type) (within an index)</li> + * <li>Documents (of a particular type)</li> + * </ul> + * + * When instantiating this DataContext, an index name is provided. Within this + * index, each document type is represented as a table. + * + * This implementation supports either automatic discovery of a schema or manual + * specification of a schema, through the {@link SimpleTableDef} class. + */ +public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class); + + public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60); + + private final Client elasticSearchClient; + private final String indexName; + // Table definitions that are set from the beginning, not supposed to be changed. + private final List<SimpleTableDef> staticTableDefinitions; + + // Table definitions that are discovered, these can change + private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); + + /** + * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a + * custom array of {@link SimpleTableDef}s which allows the user to define + * his own view on the indexes in the engine. + * + * @param client + * the ElasticSearch client + * @param indexName + * the name of the ElasticSearch index to represent + * @param tableDefinitions + * an array of {@link SimpleTableDef}s, which define the table + * and column model of the ElasticSearch index. + */ + public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) { + if (client == null) { + throw new IllegalArgumentException("ElasticSearch Client cannot be null"); + } + if (indexName == null || indexName.trim().length() == 0) { + throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); + } + this.elasticSearchClient = client; + this.indexName = indexName; + this.staticTableDefinitions = Arrays.asList(tableDefinitions); + this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); + } + + /** + * Constructs a {@link ElasticSearchDataContext} and automatically detects + * the schema structure/view on all indexes (see + * {@link this.detectSchema(Client, String)}). + * + * @param client + * the ElasticSearch client + * @param indexName + * the name of the ElasticSearch index to represent + */ + public ElasticSearchDataContext(Client client, String indexName) { + this(client, indexName, new SimpleTableDef[0]); + } + + /** + * Performs an analysis of the available indexes in an ElasticSearch cluster + * {@link Client} instance and detects the elasticsearch types structure + * based on the metadata provided by the ElasticSearch java client. + * + * @see #detectTable(ClusterState, String, String) + * @return a mutable schema instance, useful for further fine tuning by the + * user. + */ + private SimpleTableDef[] detectSchema() { + logger.info("Detecting schema for index '{}'", indexName); + + final ClusterState cs; + final ClusterStateRequestBuilder clusterStateRequestBuilder = + getElasticSearchClient().admin().cluster().prepareState(); + + // different methods here to set the index name, so we have to use + // reflection :-/ + try { + final byte majorVersion = Version.CURRENT.major; + final Object methodArgument = new String[] { indexName }; + if (majorVersion == 0) { + final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class); + method.invoke(clusterStateRequestBuilder, methodArgument); + } else { + final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class); + method.invoke(clusterStateRequestBuilder, methodArgument); + } + } catch (Exception e) { + logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e); + throw new MetaModelException("Failed to create request for index information needed to detect schema", e); + } + cs = clusterStateRequestBuilder.execute().actionGet().getState(); + + final List<SimpleTableDef> result = new ArrayList<>(); + + final IndexMetaData imd = cs.getMetaData().index(indexName); + if (imd == null) { + // index does not exist + logger.warn("No metadata returned for index name '{}' - no tables will be detected."); + } else { + final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); + final ObjectLookupContainer<String> documentTypes = mappings.keys(); + + for (final Object documentTypeCursor : documentTypes) { + final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString(); + try { + final SimpleTableDef table = detectTable(cs, indexName, documentType); + result.add(table); + } catch (Exception e) { + logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); + } + } + } + final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); + Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() { + @Override + public int compare(SimpleTableDef o1, SimpleTableDef o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + + return tableDefArray; + } + + /** + * Performs an analysis of an available index type in an ElasticSearch + * {@link Client} client and tries to detect the index structure based on + * the metadata provided by the java client. + * + * @param cs + * the ElasticSearch cluster + * @param indexName + * the name of the index + * @param documentType + * the name of the index type + * @return a table definition for ElasticSearch. + */ + public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception { + logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName); + final IndexMetaData imd = cs.getMetaData().index(indexName); + if (imd == null) { + // index does not exist + throw new IllegalArgumentException("No such index: " + indexName); + } + final MappingMetaData mappingMetaData = imd.mapping(documentType); + if (mappingMetaData == null) { + throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType); + } + final Map<String, Object> mp = mappingMetaData.getSourceAsMap(); + final Object metadataProperties = mp.get("properties"); + if (metadataProperties != null && metadataProperties instanceof Map) { + @SuppressWarnings("unchecked") + final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties; + final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap); + final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(), + metaData.getColumnTypes()); + return std; + } + throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType + + "' in index: " + indexName); + } + + @Override + protected Schema getMainSchema() throws MetaModelException { + final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); + for (final SimpleTableDef tableDef : staticTableDefinitions) { + addTable(theSchema, tableDef); + } + + final SimpleTableDef[] tables = detectSchema(); + synchronized (this) { + dynamicTableDefinitions.clear(); + dynamicTableDefinitions.addAll(Arrays.asList(tables)); + for (final SimpleTableDef tableDef : dynamicTableDefinitions) { + final List<String> tableNames = Arrays.asList(theSchema.getTableNames()); + + if (!tableNames.contains(tableDef.getName())) { + addTable(theSchema, tableDef); + } + } + } + + return theSchema; + } + + private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { + final MutableTable table = tableDef.toTable().setSchema(theSchema); + final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID); + if (idColumn != null && idColumn instanceof MutableColumn) { + final MutableColumn mutableColumn = (MutableColumn) idColumn; + mutableColumn.setPrimaryKey(true); + } + theSchema.addTable(table); + } + + @Override + protected String getMainSchemaName() throws MetaModelException { + return indexName; + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, + List<FilterItem> whereItems, int firstRow, int maxRows) { + final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND); + if (queryBuilder != null) { + // where clause can be pushed down to an ElasticSearch query + final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder); + final SearchResponse response = searchRequest.execute().actionGet(); + return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false); + } + return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) { + final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null); + final SearchResponse response = searchRequest.execute().actionGet(); + return new ElasticSearchDataSet(elasticSearchClient, response, columns, false); + } + + private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) { + final String documentType = table.getName(); + final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType); + if (firstRow > 1) { + final int zeroBasedFrom = firstRow - 1; + searchRequest.setFrom(zeroBasedFrom); + } + if (limitMaxRowsIsSet(maxRows)) { + searchRequest.setSize(maxRows); + } else { + searchRequest.setScroll(TIMEOUT_SCROLL); + } + + if (queryBuilder != null) { + searchRequest.setQuery(queryBuilder); + } + + return searchRequest; + } + + @Override + protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn, + Object keyValue) { + if (keyValue == null) { + return null; + } + + final String documentType = table.getName(); + final String id = keyValue.toString(); + + final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet(); + + if (!response.isExists()) { + return null; + } + + final Map<String, Object> source = response.getSource(); + final String documentId = response.getId(); + + final DataSetHeader header = new SimpleDataSetHeader(selectItems); + + return NativeElasticSearchUtils.createRow(source, documentId, header); + } + + @Override + protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) { + if (!whereItems.isEmpty()) { + // not supported - will have to be done by counting client-side + return null; + } + final String documentType = table.getName(); + final CountResponse response = elasticSearchClient.prepareCount(indexName) + .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet(); + return response.getCount(); + } + + private boolean limitMaxRowsIsSet(int maxRows) { + return (maxRows != -1); + } + + @Override + public void executeUpdate(UpdateScript update) { + final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this); + update.run(callback); + callback.onExecuteUpdateFinished(); + } + + /** + * Gets the {@link Client} that this {@link DataContext} is wrapping. + * + * @return + */ + public Client getElasticSearchClient() { + return elasticSearchClient; + } + + /** + * Gets the name of the index that this {@link DataContext} is working on. + * + * @return + */ + public String getIndexName() { + return indexName; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java new file mode 100644 index 0000000..7f4048a --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.metamodel.data.AbstractDataSet; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.elasticsearch.action.search.ClearScrollRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link DataSet} implementation for ElasticSearch + */ +final class ElasticSearchDataSet extends AbstractDataSet { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class); + + private final Client _client; + private final AtomicBoolean _closed; + + private SearchResponse _searchResponse; + private SearchHit _currentHit; + private int _hitIndex = 0; + + public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems, + boolean queryPostProcessed) { + super(selectItems); + _client = client; + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + + public ElasticSearchDataSet(Client client, SearchResponse searchResponse, Column[] columns, + boolean queryPostProcessed) { + super(columns); + _client = client; + _searchResponse = searchResponse; + _closed = new AtomicBoolean(false); + } + + @Override + public void close() { + super.close(); + boolean closeNow = _closed.compareAndSet(true, false); + if (closeNow) { + ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client) + .addScrollId(_searchResponse.getScrollId()); + scrollRequestBuilder.execute(); + } + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (!_closed.get()) { + logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); + close(); + } + } + + @Override + public boolean next() { + final SearchHit[] hits = _searchResponse.getHits().hits(); + if (hits.length == 0) { + // break condition for the scroll + _currentHit = null; + return false; + } + + if (_hitIndex < hits.length) { + // pick the next hit within this search response + _currentHit = hits[_hitIndex]; + _hitIndex++; + return true; + } + + final String scrollId = _searchResponse.getScrollId(); + if (scrollId == null) { + // this search response is not scrolleable - then it's the end. + _currentHit = null; + return false; + } + + // try to scroll to the next set of hits + _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL) + .execute().actionGet(); + + // start over (recursively) + _hitIndex = 0; + return next(); + } + + @Override + public Row getRow() { + if (_currentHit == null) { + return null; + } + + final Map<String, Object> source = _currentHit.getSource(); + final String documentId = _currentHit.getId(); + final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader()); + return row; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java new file mode 100644 index 0000000..0de2a71 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.List; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.delete.AbstractRowDeletionBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.LogicalOperator; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link RowDeletionBuilder} implementation for + * {@link ElasticSearchDataContext}. + */ +final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDeleteBuilder.class); + + private final ElasticSearchUpdateCallback _updateCallback; + + public ElasticSearchDeleteBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + final Table table = getTable(); + final String documentType = table.getName(); + + final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + + final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); + deleteByQueryRequestBuilder.setIndices(indexName); + deleteByQueryRequestBuilder.setTypes(documentType); + + final List<FilterItem> whereItems = getWhereItems(); + + // delete by query - note that creteQueryBuilderForSimpleWhere may + // return matchAllQuery() if no where items are present. + final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, + LogicalOperator.AND); + if (queryBuilder == null) { + // TODO: The where items could not be pushed down to a query. We + // could solve this by running a query first, gather all + // document IDs and then delete by IDs. + throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " + + whereItems); + } + deleteByQueryRequestBuilder.setQuery(queryBuilder); + deleteByQueryRequestBuilder.execute().actionGet(); + + logger.debug("Deleted documents by query."); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java new file mode 100644 index 0000000..d66b240 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.lang.reflect.Method; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.drop.AbstractTableDropBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TableDropBuilder} for dropping tables (document types) in an + * ElasticSearch index. + */ +final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class); + + private final ElasticSearchUpdateCallback _updateCallback; + + public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); + final Table table = getTable(); + final String documentType = table.getName(); + logger.info("Deleting mapping / document type: {}", documentType); + final Client client = dataContext.getElasticSearchClient(); + final IndicesAdminClient indicesAdminClient = client.admin().indices(); + final String indexName = dataContext.getIndexName(); + + final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient) + .setIndices(indexName); + setType(requestBuilder, documentType); + + final DeleteMappingResponse result = requestBuilder.execute().actionGet(); + logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged()); + + final MutableSchema schema = (MutableSchema) table.getSchema(); + schema.removeTable(table); + } + + /** + * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method + * using reflection. This is done because the API of ElasticSearch was + * changed and the method signature differes between different versions. + * + * @param requestBuilder + * @param documentType + */ + private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) { + Object argument; + Method method; + try { + try { + method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class); + argument = new String[] {documentType}; + } catch (NoSuchMethodException e) { + logger.debug("No setType(String[]) method found, trying with a single String instead", e); + method = requestBuilder.getClass().getDeclaredMethod("setType", String.class); + argument = documentType; + } + } catch (Exception e) { + logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e); + throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e); + } + try { + method.setAccessible(true); + method.invoke(requestBuilder, argument); + } catch (Exception e) { + logger.error("Failed to invoke {}", method, e); + throw new IllegalStateException("Failed to invoke " + method, e); + } + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java new file mode 100644 index 0000000..70d31b4 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchUpdateCallback> { + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchInsertBuilder.class); + + public ElasticSearchInsertBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { + super(updateCallback, table); + } + + @Override + public void execute() throws MetaModelException { + final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + final String documentType = getTable().getName(); + final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType); + + final Map<String, Object> valueMap = new HashMap<>(); + final Column[] columns = getColumns(); + final Object[] values = getValues(); + for (int i = 0; i < columns.length; i++) { + if (isSet(columns[i])) { + final String name = columns[i].getName(); + final Object value = values[i]; + if (ElasticSearchUtils.FIELD_ID.equals(name)) { + if (value != null) { + requestBuilder.setId(value.toString()); + } + } else { + valueMap.put(name, value); + } + } + } + + assert !valueMap.isEmpty(); + + requestBuilder.setSource(valueMap); + requestBuilder.setCreate(true); + + final IndexResponse result = requestBuilder.execute().actionGet(); + + logger.debug("Inserted document: id={}", result.getId()); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java new file mode 100644 index 0000000..c0a1232 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils; +import org.apache.metamodel.schema.ColumnType; + +/** + * Parser that transforms the ElasticSearch metadata response (json-like format) + * into an ElasticSearchMetaData object. + */ +public class ElasticSearchMetaDataParser { + + /** + * Parses the ElasticSearch meta data info into an ElasticSearchMetaData + * object. This method makes much easier to create the ElasticSearch schema. + * + * @param metaDataInfo + * ElasticSearch mapping metadata in Map format + * @return An ElasticSearchMetaData object + */ + public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) { + final String[] fieldNames = new String[metaDataInfo.size() + 1]; + final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1]; + + // add the document ID field (fixed) + fieldNames[0] = ElasticSearchUtils.FIELD_ID; + columnTypes[0] = ColumnType.STRING; + + int i = 1; + for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) { + @SuppressWarnings("unchecked") + final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue(); + + fieldNames[i] = metaDataField.getKey(); + columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata); + i++; + + } + return new ElasticSearchMetaData(fieldNames, columnTypes); + } + + private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) { + final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata); + + if (metaDataFieldType == null) { + return ColumnType.STRING; + } + + return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType); + } + + private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) { + final Object type = metaDataField.get("type"); + if (type == null) { + return null; + } + return type.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java new file mode 100644 index 0000000..b81c9c7 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import org.apache.metamodel.AbstractUpdateCallback; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.create.TableCreationBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.insert.RowInsertionBuilder; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.elasticsearch.client.Client; + +/** + * {@link UpdateCallback} implementation for {@link ElasticSearchDataContext}. + */ +final class ElasticSearchUpdateCallback extends AbstractUpdateCallback { + + public ElasticSearchUpdateCallback(ElasticSearchDataContext dataContext) { + super(dataContext); + } + + @Override + public ElasticSearchDataContext getDataContext() { + return (ElasticSearchDataContext) super.getDataContext(); + } + + @Override + public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException, + IllegalStateException { + return new ElasticSearchCreateTableBuilder(this, schema, name); + } + + @Override + public boolean isDropTableSupported() { + return true; + } + + @Override + public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new ElasticSearchDropTableBuilder(this, table); + } + + @Override + public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new ElasticSearchInsertBuilder(this, table); + } + + @Override + public boolean isDeleteSupported() { + return true; + } + + @Override + public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new ElasticSearchDeleteBuilder(this, table); + } + + public void onExecuteUpdateFinished() { + // force refresh of the index + final ElasticSearchDataContext dataContext = getDataContext(); + final Client client = dataContext.getElasticSearchClient(); + final String indexName = dataContext.getIndexName(); + client.admin().indices().prepareRefresh(indexName).execute().actionGet(); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java new file mode 100644 index 0000000..1efb0c8 --- /dev/null +++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.nativeclient; + +import java.util.Date; +import java.util.Map; + +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.DefaultRow; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; + +/** + * Shared/common util functions for the ElasticSearch MetaModel module. + */ +final class NativeElasticSearchUtils { + + public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) { + final Object[] values = new Object[header.size()]; + for (int i = 0; i < values.length; i++) { + final SelectItem selectItem = header.getSelectItem(i); + final Column column = selectItem.getColumn(); + + assert column != null; + assert selectItem.getFunction() == null; + + if (column.isPrimaryKey()) { + values[i] = documentId; + } else { + Object value = sourceMap.get(column.getName()); + + if (column.getType() == ColumnType.DATE) { + Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value); + if (valueToDate == null) { + values[i] = value; + } else { + values[i] = valueToDate; + } + } else { + values[i] = value; + } + } + } + + return new DefaultRow(header, values); + } +}