METAMODEL-212: Fixed

Fixes #67

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/137caf0d
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/137caf0d
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/137caf0d

Branch: refs/heads/master
Commit: 137caf0d2faab1bc0ad42e972aef01f0282423a2
Parents: f7989b8
Author: Dennis Du Krøger <d...@hp23c.dk>
Authored: Fri Nov 27 11:13:57 2015 +0100
Committer: Kasper Sørensen <i.am.kasper.soren...@gmail.com>
Committed: Fri Nov 27 11:13:57 2015 +0100

----------------------------------------------------------------------
 .gitignore                                      |  10 +-
 CHANGES.md                                      |   1 +
 .../org/apache/metamodel/query/SelectItem.java  |   8 +-
 elasticsearch/common/pom.xml                    |  50 ++
 .../common/ElasticSearchDateConverter.java      |  41 ++
 .../common/ElasticSearchMetaData.java           |  50 ++
 .../common/ElasticSearchUtils.java              | 260 ++++++++
 .../common/ElasticSearchDateConverterTest.java  |  36 ++
 elasticsearch/native/pom.xml                    |  62 ++
 .../ElasticSearchCreateTableBuilder.java        |  65 ++
 .../nativeclient/ElasticSearchDataContext.java  | 384 ++++++++++++
 .../nativeclient/ElasticSearchDataSet.java      | 130 ++++
 .../ElasticSearchDeleteBuilder.java             |  82 +++
 .../ElasticSearchDropTableBuilder.java          | 103 ++++
 .../ElasticSearchInsertBuilder.java             |  78 +++
 .../ElasticSearchMetaDataParser.java            |  81 +++
 .../ElasticSearchUpdateCallback.java            |  87 +++
 .../nativeclient/NativeElasticSearchUtils.java  |  66 ++
 .../ElasticSearchDataContextTest.java           | 606 ++++++++++++++++++
 .../ElasticSearchMetaDataParserTest.java        |  66 ++
 .../nativeclient/ElasticSearchUtilsTest.java    |  63 ++
 .../utils/EmbeddedElasticsearchServer.java      |  71 +++
 elasticsearch/pom.xml                           |  39 +-
 elasticsearch/rest/pom.xml                      |  93 +++
 .../rest/ElasticSearchRestDataContext.java      | 375 +++++++++++
 .../elasticsearch/rest/JestClientExecutor.java  |  51 ++
 .../elasticsearch/rest/JestDeleteScroll.java    |  57 ++
 .../JestElasticSearchCreateTableBuilder.java    |  50 ++
 .../rest/JestElasticSearchDataSet.java          | 129 ++++
 .../rest/JestElasticSearchDeleteBuilder.java    |  76 +++
 .../rest/JestElasticSearchDropTableBuilder.java |  61 ++
 .../rest/JestElasticSearchInsertBuilder.java    |  77 +++
 .../rest/JestElasticSearchMetaDataParser.java   |  75 +++
 .../rest/JestElasticSearchUpdateCallback.java   |  84 +++
 .../rest/JestElasticSearchUtils.java            |  76 +++
 .../rest/JestElasticSearchDataContextTest.java  | 614 +++++++++++++++++++
 .../JestElasticSearchMetaDataParserTest.java    |  70 +++
 .../rest/JestElasticSearchUtilsTest.java        |  68 ++
 .../rest/utils/EmbeddedElasticsearchServer.java |  72 +++
 .../ElasticSearchCreateTableBuilder.java        | 142 -----
 .../elasticsearch/ElasticSearchDataContext.java | 471 --------------
 .../elasticsearch/ElasticSearchDataSet.java     | 130 ----
 .../ElasticSearchDateConverter.java             |  41 --
 .../ElasticSearchDeleteBuilder.java             |  81 ---
 .../ElasticSearchDropTableBuilder.java          | 103 ----
 .../ElasticSearchInsertBuilder.java             |  77 ---
 .../elasticsearch/ElasticSearchMetaData.java    |  52 --
 .../ElasticSearchMetaDataParser.java            |  95 ---
 .../ElasticSearchUpdateCallback.java            |  87 ---
 .../elasticsearch/ElasticSearchUtils.java       |  66 --
 .../ElasticSearchDataContextTest.java           | 605 ------------------
 .../ElasticSearchDateConverterTest.java         |  34 -
 .../ElasticSearchMetaDataParserTest.java        |  65 --
 .../elasticsearch/ElasticSearchUtilsTest.java   |  63 --
 .../utils/EmbeddedElasticsearchServer.java      |  71 ---
 full/pom.xml                                    |  15 +-
 .../apache/metamodel/DataContextFactory.java    |  20 +-
 pom.xml                                         |   2 +-
 58 files changed, 4457 insertions(+), 2230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index f4edda6..e1af7b0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,9 @@
-/.project
-/.settings
-/target
-/.idea
+.project
+.settings/
+.classpath/
+.metadata/
+target/
+/.idea/
 *.iml
 *.ipr
 *.iws

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index cdf244a..47018ba 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
 ### Apache MetaModel (work-in-progress)
 
+ * [METAMODEL-212] - New module for ElasticSearch via REST client.
  * [METAMODEL-207] - Ensured the serializability of the SingleLineCsvRow class.
  * [METAMODEL-211] - Fixed a bug related to lookup by primary key (_id) on 
MongoDB.
  * [METAMODEL-15] - Query parser support for table names with space. 
Delimitters can be double quote or square brackets. 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/core/src/main/java/org/apache/metamodel/query/SelectItem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/metamodel/query/SelectItem.java 
b/core/src/main/java/org/apache/metamodel/query/SelectItem.java
index a1e33de..cbbe919 100644
--- a/core/src/main/java/org/apache/metamodel/query/SelectItem.java
+++ b/core/src/main/java/org/apache/metamodel/query/SelectItem.java
@@ -215,13 +215,19 @@ public class SelectItem extends BaseObject implements 
QueryItem, Cloneable {
      * 
      * @return
      * @deprecated use {@link #getAggregateFunction()} or
-     *             {@link #getScalarFunction()} instead
+     *             {@link #getScalarFunction()} instead,
+     *             or {@link #hasFunction()} to check if a
+     *             function is set at all.
      */
     @Deprecated
     public FunctionType getFunction() {
         return _function;
     }
 
+    public boolean hasFunction(){
+        return _function != null;
+    }
+
     public AggregateFunction getAggregateFunction() {
         if (_function instanceof AggregateFunction) {
             return (AggregateFunction) _function;

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/common/pom.xml b/elasticsearch/common/pom.xml
new file mode 100644
index 0000000..98e723a
--- /dev/null
+++ b/elasticsearch/common/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       you under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>MetaModel-elasticsearch</artifactId>
+               <groupId>org.apache.metamodel</groupId>
+               <version>4.4.2-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>MetaModel-elasticsearch-common</artifactId>
+       <name>MetaModel module for ElasticSearch commons</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- elasticsearch -->
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
new file mode 100644
index 0000000..a6ce656
--- /dev/null
+++ 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import org.apache.metamodel.util.TimeComparator;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Util class to convert date strings from ElasticSearch to
+ * proper java Dates.
+ */
+public final class ElasticSearchDateConverter {
+
+    public static Date tryToConvert(String dateAsString) {
+        try {
+            SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+            return dateFormat.parse(dateAsString);
+        } catch (ParseException e) {
+            return TimeComparator.toDate(dateAsString);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java
 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java
new file mode 100644
index 0000000..7c32eb0
--- /dev/null
+++ 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaData.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * MetaData representation of an ElasticSearch index type.
+ *
+ * We will map the elasticsearch fields to columns and their
+ * types to {@link ColumnType}s.
+ */
+public class ElasticSearchMetaData {
+    
+    private final String[] columnNames;
+    private final ColumnType[] columnTypes;
+
+    /**
+     * Constructs a {@link ElasticSearchMetaData}.
+     *
+     */
+    public ElasticSearchMetaData(String[] names, ColumnType[] types) {
+        this.columnNames = names;
+        this.columnTypes = types;
+    }
+
+    public String[] getColumnNames() {
+        return columnNames;
+    }
+
+    public ColumnType[] getColumnTypes() {
+        return columnTypes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
new file mode 100644
index 0000000..11d35bd
--- /dev/null
+++ 
b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.util.CollectionUtils;
+import org.elasticsearch.common.base.Strings;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchUtils {
+    public static final String FIELD_ID = "_id";
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchUtils.class);
+
+    /**
+     * Gets a "filter" query which is both 1.x and 2.x compatible.
+     */
+    private static QueryBuilder getFilteredQuery(String prefix, String 
fieldName) {
+        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, 
FilterBuilders.missingFilter(fieldName));
+        // 2.x: itemQueryBuilder = 
QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
+        try {
+            try {
+                Method method = QueryBuilders.class.getDeclaredMethod(prefix + 
"Query", String.class);
+                method.setAccessible(true);
+                return QueryBuilders.boolQuery().must((QueryBuilder) 
method.invoke(null, fieldName));
+            } catch (NoSuchMethodException e) {
+                Class<?> clazz = ElasticSearchUtils.class.getClassLoader()
+                        
.loadClass("org.elasticsearch.index.query.FilterBuilders");
+                Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + 
"Filter", String.class);
+                filterBuilderMethod.setAccessible(true);
+                Method queryBuildersFilteredQueryMethod =
+                        QueryBuilders.class.getDeclaredMethod("filteredQuery", 
QueryBuilder.class, FilterBuilder.class);
+                return (QueryBuilder) 
queryBuildersFilteredQueryMethod.invoke(null, null,
+                        filterBuilderMethod.invoke(null, fieldName));
+            }
+        } catch (Exception e) {
+            logger.error("Failed to resolve/invoke filtering method", e);
+            throw new IllegalStateException("Failed to resolve filtering 
method", e);
+        }
+    }
+
+    public static QueryBuilder getMissingQuery(String fieldName) {
+        return getFilteredQuery("missing", fieldName);
+    }
+
+    public static QueryBuilder getExistsQuery(String fieldName) {
+        return getFilteredQuery("exists", fieldName);
+    }
+
+    public static List<Object> getSourceProperties(final MutableTable table) {
+        if (table.getColumnByName(FIELD_ID) == null) {
+            final MutableColumn idColumn = new MutableColumn(FIELD_ID, 
ColumnType.STRING)
+                    .setTable(table).setPrimaryKey(true);
+            table.addColumn(0, idColumn);
+        }
+
+        final List<Object> sourceProperties = new ArrayList<>();
+
+        for (Column column : table.getColumns()) {
+            // each column is defined as a property pair of the form: 
("field1",
+            // "type=string,store=true")
+            final String columnName = column.getName();
+            if (FIELD_ID.equals(columnName)) {
+                // do nothing - the ID is a client-side construct
+                continue;
+            }
+            sourceProperties.add(columnName);
+
+            String type = getType(column);
+            if (type == null) {
+                sourceProperties.add("store=true");
+            } else {
+                sourceProperties.add("type=" + type + ",store=true");
+            }
+        }
+
+        return sourceProperties;
+    }
+
+    /**
+     * Determines the best fitting type. For reference of ElasticSearch types,
+     * see
+     *
+     * <pre>
+     * 
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html
+     * </pre>
+     *
+     *
+     * @param column
+     * @return
+     */
+    private static String getType(Column column) {
+        String nativeType = column.getNativeType();
+        if (!Strings.isNullOrEmpty(nativeType)) {
+            return nativeType;
+        }
+
+        final ColumnType type = column.getType();
+        if (type == null) {
+            throw new IllegalStateException("No column type specified for '" + 
column.getName()
+                    + "' - cannot build ElasticSearch mapping without type.");
+        }
+
+        if (type.isLiteral()) {
+            return "string";
+        } else if (type == ColumnType.FLOAT) {
+            return "float";
+        } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || 
type == ColumnType.NUMBER) {
+            return "double";
+        } else if (type == ColumnType.SMALLINT) {
+            return "short";
+        } else if (type == ColumnType.TINYINT) {
+            return "byte";
+        } else if (type == ColumnType.INTEGER) {
+            return "integer";
+        } else if (type == ColumnType.DATE || type == ColumnType.TIMESTAMP) {
+            return "date";
+        } else if (type == ColumnType.BINARY || type == ColumnType.VARBINARY) {
+            return "binary";
+        } else if (type == ColumnType.BOOLEAN || type == ColumnType.BIT) {
+            return "boolean";
+        } else if (type == ColumnType.MAP) {
+            return "object";
+        }
+
+        throw new UnsupportedOperationException("Unsupported column type '" + 
type.getName() + "' of column '"
+                + column.getName() + "' - cannot translate to an ElasticSearch 
type.");
+    }
+
+    /**
+     * Creates, if possible, a {@link QueryBuilder} object which can be used to
+     * push down one or more {@link FilterItem}s to ElasticSearch's backend.
+     *
+     * @return a {@link QueryBuilder} if one was produced, or null if the items
+     *         could not be pushed down to an ElasticSearch query
+     */
+    public static QueryBuilder 
createQueryBuilderForSimpleWhere(List<FilterItem> whereItems,
+            LogicalOperator logicalOperator) {
+        if (whereItems.isEmpty()) {
+            return QueryBuilders.matchAllQuery();
+        }
+
+        List<QueryBuilder> children = new ArrayList<>(whereItems.size());
+        for (FilterItem item : whereItems) {
+            final QueryBuilder itemQueryBuilder;
+
+            if (item.isCompoundFilter()) {
+                final List<FilterItem> childItems = 
Arrays.asList(item.getChildItems());
+                itemQueryBuilder = 
createQueryBuilderForSimpleWhere(childItems, item.getLogicalOperator());
+                if (itemQueryBuilder == null) {
+                    // something was not supported, so we have to forfeit here
+                    // too.
+                    return null;
+                }
+            } else {
+                final Column column = item.getSelectItem().getColumn();
+                if (column == null) {
+                    // unsupported type of where item - must have a column
+                    // reference
+                    return null;
+                }
+                final String fieldName = column.getName();
+                final Object operand = item.getOperand();
+                final OperatorType operator = item.getOperator();
+
+                if (OperatorType.EQUALS_TO.equals(operator)) {
+                    if (operand == null) {
+                        itemQueryBuilder = getMissingQuery(fieldName);
+                    } else {
+                        itemQueryBuilder = QueryBuilders.termQuery(fieldName, 
operand);
+                    }
+                } else if (OperatorType.DIFFERENT_FROM.equals(operator)) {
+                    if (operand == null) {
+                        itemQueryBuilder = getExistsQuery(fieldName);
+                    } else {
+                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(
+                                QueryBuilders.termQuery(fieldName, operand));
+                    }
+                } else if (OperatorType.IN.equals(operator)) {
+                    final List<?> operands = CollectionUtils.toList(operand);
+                    itemQueryBuilder = QueryBuilders.termsQuery(fieldName, 
operands);
+                } else {
+                    // not (yet) support operator types
+                    return null;
+                }
+            }
+
+            children.add(itemQueryBuilder);
+        }
+
+        // just one where item - just return the child query builder
+        if (children.size() == 1) {
+            return children.get(0);
+        }
+
+        // build a bool query
+        final BoolQueryBuilder result = QueryBuilders.boolQuery();
+        for (QueryBuilder child : children) {
+            switch (logicalOperator) {
+            case AND:
+                result.must(child);
+            case OR:
+                result.should(child);
+            }
+        }
+
+        return result;
+    }
+
+    public static ColumnType getColumnTypeFromElasticSearchType(final String 
metaDataFieldType) {
+        final ColumnType columnType;
+        if (metaDataFieldType.startsWith("date")) {
+            columnType = ColumnType.DATE;
+        } else if (metaDataFieldType.equals("long")) {
+            columnType = ColumnType.BIGINT;
+        } else if (metaDataFieldType.equals("string")) {
+            columnType = ColumnType.STRING;
+        } else if (metaDataFieldType.equals("float")) {
+            columnType = ColumnType.FLOAT;
+        } else if (metaDataFieldType.equals("boolean")) {
+            columnType = ColumnType.BOOLEAN;
+        } else if (metaDataFieldType.equals("double")) {
+            columnType = ColumnType.DOUBLE;
+        } else {
+            columnType = ColumnType.STRING;
+        }
+        return columnType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java
 
b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java
new file mode 100644
index 0000000..49ef1bb
--- /dev/null
+++ 
b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverterTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import junit.framework.TestCase;
+
+import java.util.Date;
+
+import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
+
+public class ElasticSearchDateConverterTest extends TestCase {
+
+    public void testConvertDateOptionalTime() throws Exception {
+        String dateToConvert = "2013-01-04T15:55:51.217+01:00";
+        Date date = ElasticSearchDateConverter.tryToConvert(dateToConvert);
+
+        assertNotNull(date);
+        assertTrue(date.toString().startsWith("Fri Jan 04"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml
new file mode 100644
index 0000000..7b117aa
--- /dev/null
+++ b/elasticsearch/native/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       you under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>MetaModel-elasticsearch</artifactId>
+               <groupId>org.apache.metamodel</groupId>
+               <version>4.4.2-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>MetaModel-elasticsearch-native</artifactId>
+       <name>MetaModel module for ElasticSearch via native (Node or Transport) 
client</name>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-elasticsearch-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>commons-io</groupId>
+                       <artifactId>commons-io</artifactId>
+               </dependency>
+
+               <!-- elasticsearch -->
+               <dependency>
+                       <groupId>org.elasticsearch</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${elasticsearch.version}</version>
+               </dependency>
+
+               <!-- test -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
new file mode 100644
index 0000000..5d6701c
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.List;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ElasticSearchCreateTableBuilder extends 
AbstractTableCreationBuilder<ElasticSearchUpdateCallback> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchCreateTableBuilder.class);
+
+    public ElasticSearchCreateTableBuilder(ElasticSearchUpdateCallback 
updateCallback, Schema schema, String name) {
+        super(updateCallback, schema, name);
+    }
+
+    @Override
+    public Table execute() throws MetaModelException {
+        final MutableTable table = getTable();
+        final List<Object> sourceProperties = 
ElasticSearchUtils.getSourceProperties(table);
+
+        final ElasticSearchDataContext dataContext = 
getUpdateCallback().getDataContext();
+        final IndicesAdminClient indicesAdmin = 
dataContext.getElasticSearchClient().admin().indices();
+        final String indexName = dataContext.getIndexName();
+
+        final PutMappingRequestBuilder requestBuilder = new 
PutMappingRequestBuilder(indicesAdmin)
+                .setIndices(indexName).setType(table.getName());
+        requestBuilder.setSource(sourceProperties.toArray());
+        final PutMappingResponse result = requestBuilder.execute().actionGet();
+
+        logger.debug("PutMapping response: acknowledged={}", 
result.isAcknowledged());
+
+        final MutableSchema schema = (MutableSchema) getSchema();
+        schema.addTable(table);
+        return table;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
new file mode 100644
index 0000000..8232394
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.hppc.ObjectLookupContainer;
+import org.elasticsearch.common.hppc.cursors.ObjectCursor;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataContext implementation for ElasticSearch analytics engine.
+ *
+ * ElasticSearch has a data storage structure hierarchy that briefly goes like
+ * this:
+ * <ul>
+ * <li>Index</li>
+ * <li>Document type (short: Type) (within an index)</li>
+ * <li>Documents (of a particular type)</li>
+ * </ul>
+ *
+ * When instantiating this DataContext, an index name is provided. Within this
+ * index, each document type is represented as a table.
+ *
+ * This implementation supports either automatic discovery of a schema or 
manual
+ * specification of a schema, through the {@link SimpleTableDef} class.
+ */
+public class ElasticSearchDataContext extends QueryPostprocessDataContext 
implements DataContext, UpdateableDataContext {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDataContext.class);
+
+    public static final TimeValue TIMEOUT_SCROLL = 
TimeValue.timeValueSeconds(60);
+
+    private final Client elasticSearchClient;
+    private final String indexName;
+    // Table definitions that are set from the beginning, not supposed to be 
changed.
+    private final List<SimpleTableDef> staticTableDefinitions;
+
+    // Table definitions that are discovered, these can change
+    private final List<SimpleTableDef> dynamicTableDefinitions = new 
ArrayList<>();
+
+    /**
+     * Constructs a {@link ElasticSearchDataContext}. This constructor accepts 
a
+     * custom array of {@link SimpleTableDef}s which allows the user to define
+     * his own view on the indexes in the engine.
+     *
+     * @param client
+     *            the ElasticSearch client
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     * @param tableDefinitions
+     *            an array of {@link SimpleTableDef}s, which define the table
+     *            and column model of the ElasticSearch index.
+     */
+    public ElasticSearchDataContext(Client client, String indexName, 
SimpleTableDef... tableDefinitions) {
+        if (client == null) {
+            throw new IllegalArgumentException("ElasticSearch Client cannot be 
null");
+        }
+        if (indexName == null || indexName.trim().length() == 0) {
+            throw new IllegalArgumentException("Invalid ElasticSearch Index 
name: " + indexName);
+        }
+        this.elasticSearchClient = client;
+        this.indexName = indexName;
+        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
+        this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
+    }
+
+    /**
+     * Constructs a {@link ElasticSearchDataContext} and automatically detects
+     * the schema structure/view on all indexes (see
+     * {@link this.detectSchema(Client, String)}).
+     *
+     * @param client
+     *            the ElasticSearch client
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     */
+    public ElasticSearchDataContext(Client client, String indexName) {
+        this(client, indexName, new SimpleTableDef[0]);
+    }
+
+    /**
+     * Performs an analysis of the available indexes in an ElasticSearch 
cluster
+     * {@link Client} instance and detects the elasticsearch types structure
+     * based on the metadata provided by the ElasticSearch java client.
+     *
+     * @see #detectTable(ClusterState, String, String)
+     * @return a mutable schema instance, useful for further fine tuning by the
+     *         user.
+     */
+    private SimpleTableDef[] detectSchema() {
+        logger.info("Detecting schema for index '{}'", indexName);
+
+        final ClusterState cs;
+        final ClusterStateRequestBuilder clusterStateRequestBuilder =
+                getElasticSearchClient().admin().cluster().prepareState();
+
+        // different methods here to set the index name, so we have to use
+        // reflection :-/
+        try {
+            final byte majorVersion = Version.CURRENT.major;
+            final Object methodArgument = new String[] { indexName };
+            if (majorVersion == 0) {
+                final Method method = 
ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
+                method.invoke(clusterStateRequestBuilder, methodArgument);
+            } else {
+                final Method method = 
ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
+                method.invoke(clusterStateRequestBuilder, methodArgument);
+            }
+        } catch (Exception e) {
+            logger.error("Failed to set index name on 
ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
+            throw new MetaModelException("Failed to create request for index 
information needed to detect schema", e);
+        }
+        cs = clusterStateRequestBuilder.execute().actionGet().getState();
+
+        final List<SimpleTableDef> result = new ArrayList<>();
+
+        final IndexMetaData imd = cs.getMetaData().index(indexName);
+        if (imd == null) {
+            // index does not exist
+            logger.warn("No metadata returned for index name '{}' - no tables 
will be detected.");
+        } else {
+            final ImmutableOpenMap<String, MappingMetaData> mappings = 
imd.getMappings();
+            final ObjectLookupContainer<String> documentTypes = 
mappings.keys();
+
+            for (final Object documentTypeCursor : documentTypes) {
+                final String documentType = ((ObjectCursor<?>) 
documentTypeCursor).value.toString();
+                try {
+                    final SimpleTableDef table = detectTable(cs, indexName, 
documentType);
+                    result.add(table);
+                } catch (Exception e) {
+                    logger.error("Unexpected error during detectTable for 
document type '{}'", documentType, e);
+                }
+            }
+        }
+        final SimpleTableDef[] tableDefArray = result.toArray(new 
SimpleTableDef[result.size()]);
+        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
+            @Override
+            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
+                return o1.getName().compareTo(o2.getName());
+            }
+        });
+
+        return tableDefArray;
+    }
+
+    /**
+     * Performs an analysis of an available index type in an ElasticSearch
+     * {@link Client} client and tries to detect the index structure based on
+     * the metadata provided by the java client.
+     *
+     * @param cs
+     *            the ElasticSearch cluster
+     * @param indexName
+     *            the name of the index
+     * @param documentType
+     *            the name of the index type
+     * @return a table definition for ElasticSearch.
+     */
+    public static SimpleTableDef detectTable(ClusterState cs, String 
indexName, String documentType) throws Exception {
+        logger.debug("Detecting table for document type '{}' in index '{}'", 
documentType, indexName);
+        final IndexMetaData imd = cs.getMetaData().index(indexName);
+        if (imd == null) {
+            // index does not exist
+            throw new IllegalArgumentException("No such index: " + indexName);
+        }
+        final MappingMetaData mappingMetaData = imd.mapping(documentType);
+        if (mappingMetaData == null) {
+            throw new IllegalArgumentException("No such document type in index 
'" + indexName + "': " + documentType);
+        }
+        final Map<String, Object> mp = mappingMetaData.getSourceAsMap();
+        final Object metadataProperties = mp.get("properties");
+        if (metadataProperties != null && metadataProperties instanceof Map) {
+            @SuppressWarnings("unchecked")
+            final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) 
metadataProperties;
+            final ElasticSearchMetaData metaData = 
ElasticSearchMetaDataParser.parse(metadataPropertiesMap);
+            final SimpleTableDef std = new SimpleTableDef(documentType, 
metaData.getColumnNames(),
+                    metaData.getColumnTypes());
+            return std;
+        }
+        throw new IllegalArgumentException("No mapping properties defined for 
document type '" + documentType
+                + "' in index: " + indexName);
+    }
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+        for (final SimpleTableDef tableDef : staticTableDefinitions) {
+            addTable(theSchema, tableDef);
+        }
+
+        final SimpleTableDef[] tables = detectSchema();
+        synchronized (this) {
+            dynamicTableDefinitions.clear();
+            dynamicTableDefinitions.addAll(Arrays.asList(tables));
+            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+                final List<String> tableNames = 
Arrays.asList(theSchema.getTableNames());
+
+                if (!tableNames.contains(tableDef.getName())) {
+                    addTable(theSchema, tableDef);
+                }
+            }
+        }
+
+        return theSchema;
+    }
+
+    private void addTable(final MutableSchema theSchema, final SimpleTableDef 
tableDef) {
+        final MutableTable table = tableDef.toTable().setSchema(theSchema);
+        final Column idColumn = 
table.getColumnByName(ElasticSearchUtils.FIELD_ID);
+        if (idColumn != null && idColumn instanceof MutableColumn) {
+            final MutableColumn mutableColumn = (MutableColumn) idColumn;
+            mutableColumn.setPrimaryKey(true);
+        }
+        theSchema.addTable(table);
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return indexName;
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> 
selectItems,
+            List<FilterItem> whereItems, int firstRow, int maxRows) {
+        final QueryBuilder queryBuilder = 
ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, 
LogicalOperator.AND);
+        if (queryBuilder != null) {
+            // where clause can be pushed down to an ElasticSearch query
+            final SearchRequestBuilder searchRequest = 
createSearchRequest(table, firstRow, maxRows, queryBuilder);
+            final SearchResponse response = 
searchRequest.execute().actionGet();
+            return new ElasticSearchDataSet(elasticSearchClient, response, 
selectItems, false);
+        }
+        return super.materializeMainSchemaTable(table, selectItems, 
whereItems, firstRow, maxRows);
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, Column[] 
columns, int maxRows) {
+        final SearchRequestBuilder searchRequest = createSearchRequest(table, 
1, maxRows, null);
+        final SearchResponse response = searchRequest.execute().actionGet();
+        return new ElasticSearchDataSet(elasticSearchClient, response, 
columns, false);
+    }
+
+    private SearchRequestBuilder createSearchRequest(Table table, int 
firstRow, int maxRows, QueryBuilder queryBuilder) {
+        final String documentType = table.getName();
+        final SearchRequestBuilder searchRequest = 
elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
+        if (firstRow > 1) {
+            final int zeroBasedFrom = firstRow - 1;
+            searchRequest.setFrom(zeroBasedFrom);
+        }
+        if (limitMaxRowsIsSet(maxRows)) {
+            searchRequest.setSize(maxRows);
+        } else {
+            searchRequest.setScroll(TIMEOUT_SCROLL);
+        }
+
+        if (queryBuilder != null) {
+            searchRequest.setQuery(queryBuilder);
+        }
+
+        return searchRequest;
+    }
+
+    @Override
+    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> 
selectItems, Column primaryKeyColumn,
+            Object keyValue) {
+        if (keyValue == null) {
+            return null;
+        }
+
+        final String documentType = table.getName();
+        final String id = keyValue.toString();
+
+        final GetResponse response = elasticSearchClient.prepareGet(indexName, 
documentType, id).execute().actionGet();
+
+        if (!response.isExists()) {
+            return null;
+        }
+
+        final Map<String, Object> source = response.getSource();
+        final String documentId = response.getId();
+
+        final DataSetHeader header = new SimpleDataSetHeader(selectItems);
+
+        return NativeElasticSearchUtils.createRow(source, documentId, header);
+    }
+
+    @Override
+    protected Number executeCountQuery(Table table, List<FilterItem> 
whereItems, boolean functionApproximationAllowed) {
+        if (!whereItems.isEmpty()) {
+            // not supported - will have to be done by counting client-side
+            return null;
+        }
+        final String documentType = table.getName();
+        final CountResponse response = 
elasticSearchClient.prepareCount(indexName)
+                .setQuery(QueryBuilders.termQuery("_type", 
documentType)).execute().actionGet();
+        return response.getCount();
+    }
+
+    private boolean limitMaxRowsIsSet(int maxRows) {
+        return (maxRows != -1);
+    }
+
+    @Override
+    public void executeUpdate(UpdateScript update) {
+        final ElasticSearchUpdateCallback callback = new 
ElasticSearchUpdateCallback(this);
+        update.run(callback);
+        callback.onExecuteUpdateFinished();
+    }
+
+    /**
+     * Gets the {@link Client} that this {@link DataContext} is wrapping.
+     *
+     * @return
+     */
+    public Client getElasticSearchClient() {
+        return elasticSearchClient;
+    }
+
+    /**
+     * Gets the name of the index that this {@link DataContext} is working on.
+     *
+     * @return
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
new file mode 100644
index 0000000..7f4048a
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.elasticsearch.action.search.ClearScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class ElasticSearchDataSet extends AbstractDataSet {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDataSet.class);
+
+    private final Client _client;
+    private final AtomicBoolean _closed;
+
+    private SearchResponse _searchResponse;
+    private SearchHit _currentHit;
+    private int _hitIndex = 0;
+
+    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, 
List<SelectItem> selectItems,
+            boolean queryPostProcessed) {
+        super(selectItems);
+        _client = client;
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+    
+    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, 
Column[] columns,
+            boolean queryPostProcessed) {
+        super(columns);
+        _client = client;
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        boolean closeNow = _closed.compareAndSet(true, false);
+        if (closeNow) {
+            ClearScrollRequestBuilder scrollRequestBuilder = new 
ClearScrollRequestBuilder(_client)
+                    .addScrollId(_searchResponse.getScrollId());
+            scrollRequestBuilder.execute();
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        if (!_closed.get()) {
+            logger.warn("finalize() invoked, but DataSet is not closed. 
Invoking close() on {}", this);
+            close();
+        }
+    }
+
+    @Override
+    public boolean next() {
+        final SearchHit[] hits = _searchResponse.getHits().hits();
+        if (hits.length == 0) {
+            // break condition for the scroll
+            _currentHit = null;
+            return false;
+        }
+
+        if (_hitIndex < hits.length) {
+            // pick the next hit within this search response
+            _currentHit = hits[_hitIndex];
+            _hitIndex++;
+            return true;
+        }
+
+        final String scrollId = _searchResponse.getScrollId();
+        if (scrollId == null) {
+            // this search response is not scrolleable - then it's the end.
+            _currentHit = null;
+            return false;
+        }
+
+        // try to scroll to the next set of hits
+        _searchResponse = 
_client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+                .execute().actionGet();
+
+        // start over (recursively)
+        _hitIndex = 0;
+        return next();
+    }
+
+    @Override
+    public Row getRow() {
+        if (_currentHit == null) {
+            return null;
+        }
+
+        final Map<String, Object> source = _currentHit.getSource();
+        final String documentId = _currentHit.getId();
+        final Row row = NativeElasticSearchUtils.createRow(source, documentId, 
getHeader());
+        return row;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
new file mode 100644
index 0000000..0de2a71
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.List;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link RowDeletionBuilder} implementation for
+ * {@link ElasticSearchDataContext}.
+ */
+final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDeleteBuilder.class);
+
+    private final ElasticSearchUpdateCallback _updateCallback;
+
+    public ElasticSearchDeleteBuilder(ElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchDataContext dataContext = 
_updateCallback.getDataContext();
+        final Client client = dataContext.getElasticSearchClient();
+        final String indexName = dataContext.getIndexName();
+
+        final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new 
DeleteByQueryRequestBuilder(client);
+        deleteByQueryRequestBuilder.setIndices(indexName);
+        deleteByQueryRequestBuilder.setTypes(documentType);
+
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder = 
ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+                LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException("Could not push down WHERE 
items to delete by query request: "
+                    + whereItems);
+        }
+        deleteByQueryRequestBuilder.setQuery(queryBuilder);
+        deleteByQueryRequestBuilder.execute().actionGet();
+
+        logger.debug("Deleted documents by query.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
new file mode 100644
index 0000000..d66b240
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.lang.reflect.Method;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.drop.AbstractTableDropBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.Table;
+import 
org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import 
org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TableDropBuilder} for dropping tables (document types) in an
+ * ElasticSearch index.
+ */
+final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class);
+
+    private final ElasticSearchUpdateCallback _updateCallback;
+
+    public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final ElasticSearchDataContext dataContext = 
_updateCallback.getDataContext();
+        final Table table = getTable();
+        final String documentType = table.getName();
+        logger.info("Deleting mapping / document type: {}", documentType);
+        final Client client = dataContext.getElasticSearchClient();
+        final IndicesAdminClient indicesAdminClient = client.admin().indices();
+        final String indexName = dataContext.getIndexName();
+
+        final DeleteMappingRequestBuilder requestBuilder = new 
DeleteMappingRequestBuilder(indicesAdminClient)
+                .setIndices(indexName);
+        setType(requestBuilder, documentType);
+
+        final DeleteMappingResponse result = 
requestBuilder.execute().actionGet();
+        logger.debug("Delete mapping response: acknowledged={}", 
result.isAcknowledged());
+
+        final MutableSchema schema = (MutableSchema) table.getSchema();
+        schema.removeTable(table);
+    }
+
+    /**
+     * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} 
method
+     * using reflection. This is done because the API of ElasticSearch was
+     * changed and the method signature differes between different versions.
+     * 
+     * @param requestBuilder
+     * @param documentType
+     */
+    private void setType(DeleteMappingRequestBuilder requestBuilder, String 
documentType) {
+        Object argument;
+        Method method;
+        try {
+            try {
+                method = 
requestBuilder.getClass().getDeclaredMethod("setType", String[].class);
+                argument = new String[] {documentType};
+            } catch (NoSuchMethodException e) {
+                logger.debug("No setType(String[]) method found, trying with a 
single String instead", e);
+                method = 
requestBuilder.getClass().getDeclaredMethod("setType", String.class);
+                argument = documentType;
+            }
+        } catch (Exception e) {
+            logger.error("Failed to resolve 
DeleteMappingRequestBuilder.setType(...) method", e);
+            throw new IllegalStateException("Failed to resolve 
DeleteMappingRequestBuilder.setType(...) method", e);
+        }
+        try {
+            method.setAccessible(true);
+            method.invoke(requestBuilder, argument);
+        } catch (Exception e) {
+            logger.error("Failed to invoke {}", method, e);
+            throw new IllegalStateException("Failed to invoke " + method, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
new file mode 100644
index 0000000..70d31b4
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ElasticSearchInsertBuilder extends 
AbstractRowInsertionBuilder<ElasticSearchUpdateCallback> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchInsertBuilder.class);
+
+    public ElasticSearchInsertBuilder(ElasticSearchUpdateCallback 
updateCallback, Table table) {
+        super(updateCallback, table);
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final ElasticSearchDataContext dataContext = 
getUpdateCallback().getDataContext();
+        final Client client = dataContext.getElasticSearchClient();
+        final String indexName = dataContext.getIndexName();
+        final String documentType = getTable().getName();
+        final IndexRequestBuilder requestBuilder = new 
IndexRequestBuilder(client, indexName).setType(documentType);
+
+        final Map<String, Object> valueMap = new HashMap<>();
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+        for (int i = 0; i < columns.length; i++) {
+            if (isSet(columns[i])) {
+                final String name = columns[i].getName();
+                final Object value = values[i];
+                if (ElasticSearchUtils.FIELD_ID.equals(name)) {
+                    if (value != null) {
+                        requestBuilder.setId(value.toString());
+                    }
+                } else {
+                    valueMap.put(name, value);
+                }
+            }
+        }
+
+        assert !valueMap.isEmpty();
+
+        requestBuilder.setSource(valueMap);
+        requestBuilder.setCreate(true);
+
+        final IndexResponse result = requestBuilder.execute().actionGet();
+        
+        logger.debug("Inserted document: id={}", result.getId());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..c0a1232
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like 
format)
+ * into an ElasticSearchMetaData object.
+ */
+public class ElasticSearchMetaDataParser {
+
+    /**
+     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
+     * object. This method makes much easier to create the ElasticSearch 
schema.
+     *
+     * @param metaDataInfo
+     *            ElasticSearch mapping metadata in Map format
+     * @return An ElasticSearchMetaData object
+     */
+    public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
+        final String[] fieldNames = new String[metaDataInfo.size() + 1];
+        final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 
1];
+
+        // add the document ID field (fixed)
+        fieldNames[0] = ElasticSearchUtils.FIELD_ID;
+        columnTypes[0] = ColumnType.STRING;
+
+        int i = 1;
+        for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
+            @SuppressWarnings("unchecked")
+            final Map<String, ?> fieldMetadata = (Map<String, ?>) 
metaDataField.getValue();
+
+            fieldNames[i] = metaDataField.getKey();
+            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
+            i++;
+
+        }
+        return new ElasticSearchMetaData(fieldNames, columnTypes);
+    }
+
+    private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> 
fieldMetadata) {
+        final String metaDataFieldType = 
getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
+
+        if (metaDataFieldType == null) {
+            return ColumnType.STRING;
+        }
+
+        return 
ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
+    }
+
+    private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> 
metaDataField) {
+        final Object type = metaDataField.get("type");
+        if (type == null) {
+            return null;
+        }
+        return type.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
new file mode 100644
index 0000000..b81c9c7
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.client.Client;
+
+/**
+ * {@link UpdateCallback} implementation for {@link ElasticSearchDataContext}.
+ */
+final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
+
+    public ElasticSearchUpdateCallback(ElasticSearchDataContext dataContext) {
+        super(dataContext);
+    }
+
+    @Override
+    public ElasticSearchDataContext getDataContext() {
+        return (ElasticSearchDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(Schema schema, String name) throws 
IllegalArgumentException,
+            IllegalStateException {
+        return new ElasticSearchCreateTableBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return true;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchDropTableBuilder(this, table);
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchInsertBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return true;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchDeleteBuilder(this, table);
+    }
+
+    public void onExecuteUpdateFinished() {
+        // force refresh of the index
+        final ElasticSearchDataContext dataContext = getDataContext();
+        final Client client = dataContext.getElasticSearchClient();
+        final String indexName = dataContext.getIndexName();
+        
client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
new file mode 100644
index 0000000..1efb0c8
--- /dev/null
+++ 
b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Shared/common util functions for the ElasticSearch MetaModel module.
+ */
+final class NativeElasticSearchUtils {
+
+    public static Row createRow(Map<String, Object> sourceMap, String 
documentId, DataSetHeader header) {
+        final Object[] values = new Object[header.size()];
+        for (int i = 0; i < values.length; i++) {
+            final SelectItem selectItem = header.getSelectItem(i);
+            final Column column = selectItem.getColumn();
+
+            assert column != null;
+            assert selectItem.getFunction() == null;
+
+            if (column.isPrimaryKey()) {
+                values[i] = documentId;
+            } else {
+                Object value = sourceMap.get(column.getName());
+
+                if (column.getType() == ColumnType.DATE) {
+                    Date valueToDate = 
ElasticSearchDateConverter.tryToConvert((String) value);
+                    if (valueToDate == null) {
+                        values[i] = value;
+                    } else {
+                        values[i] = valueToDate;
+                    }
+                } else {
+                    values[i] = value;
+                }
+            }
+        }
+
+        return new DefaultRow(header, values);
+    }
+}

Reply via email to