http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
new file mode 100644
index 0000000..ca2ed13
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.indices.Refresh;
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+
+/**
+ * {@link UpdateCallback} implementation for {@link 
ElasticSearchRestDataContext}.
+ */
+final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
+    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext 
dataContext) {
+        super(dataContext);
+    }
+
+    @Override
+    public ElasticSearchRestDataContext getDataContext() {
+        return (ElasticSearchRestDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(Schema schema, String name) throws 
IllegalArgumentException,
+            IllegalStateException {
+        return new JestElasticSearchCreateTableBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return true;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchDropTableBuilder(this, table);
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchInsertBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return true;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(Table table) throws 
IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchDeleteBuilder(this, table);
+    }
+
+    public void onExecuteUpdateFinished() {
+        final String indexName = getDataContext().getIndexName();
+        Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
+
+        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), 
refresh, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
new file mode 100644
index 0000000..c37ff80
--- /dev/null
+++ 
b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.util.NumberComparator;
+
+import java.util.Date;
+
+/**
+ * Shared/common util functions for the ElasticSearch MetaModel module.
+ */
+final class JestElasticSearchUtils {
+    public static Row createRow(JsonObject source, String documentId, 
DataSetHeader header) {
+        final Object[] values = new Object[header.size()];
+        for (int i = 0; i < values.length; i++) {
+            final SelectItem selectItem = header.getSelectItem(i);
+            final Column column = selectItem.getColumn();
+
+            assert column != null;
+            assert !selectItem.hasFunction();
+
+            if (column.isPrimaryKey()) {
+                values[i] = documentId;
+            } else {
+                values[i] = 
getDataFromColumnType(source.get(column.getName()), column.getType());
+            }
+        }
+
+        return new DefaultRow(header, values);
+    }
+
+    private static Object getDataFromColumnType(JsonElement field, ColumnType 
type) {
+        if (type.isNumber()) {
+            // Pretty terrible workaround to avoid LazilyParsedNumber
+            // (which is happily output, but not recognized by Jest/GSON).
+            return NumberComparator.toNumber(field.getAsString());
+        } else if (type.isTimeBased()) {
+            Date valueToDate = 
ElasticSearchDateConverter.tryToConvert(field.getAsString());
+            if (valueToDate == null) {
+                return field.getAsString();
+            } else {
+                return valueToDate;
+            }
+        } else if (type.isBoolean()) {
+            return field.getAsBoolean();
+        } else {
+            return field.getAsString();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
new file mode 100644
index 0000000..aac963a
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+import org.apache.metamodel.MetaModelHelper;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.create.CreateTable;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.InMemoryDataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.delete.DeleteFrom;
+import org.apache.metamodel.drop.DropTable;
+import 
org.apache.metamodel.elasticsearch.rest.utils.EmbeddedElasticsearchServer;
+import org.apache.metamodel.query.FunctionType;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.Update;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import 
org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.swing.table.TableModel;
+import java.io.IOException;
+import java.util.*;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.*;
+
+public class JestElasticSearchDataContextTest {
+
+    private static final String indexName = "twitter";
+    private static final String indexType1 = "tweet1";
+    private static final String indexType2 = "tweet2";
+    private static final String indexName2 = "twitter2";
+    private static final String indexType3 = "tweet3";
+    private static final String bulkIndexType = "bulktype";
+    private static final String peopleIndexType = "peopletype";
+    private static final String mapping =
+            
"{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
+    private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
+    private static JestClient client;
+    private static UpdateableDataContext dataContext;
+
+    @BeforeClass
+    public static void beforeTests() throws Exception {
+        embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
+        final int port = 
Integer.parseInt(embeddedElasticsearchServer.getClient().settings().get("http.port"));
+        JestClientFactory factory = new JestClientFactory();
+        factory.setHttpClientConfig(new HttpClientConfig
+                .Builder("http://localhost:"; + port)
+                .multiThreaded(true)
+                .build());
+        client = factory.getObject();
+
+        indexTweeterDocument(indexType1, 1);
+        indexTweeterDocument(indexType2, 1);
+        indexTweeterDocument(indexType2, 2, null);
+        insertPeopleDocuments();
+        indexTweeterDocument(indexType2, 1);
+        indexBulkDocuments(indexName, bulkIndexType, 10);
+
+        // The refresh API allows to explicitly refresh one or more index,
+        // making all operations performed since the last refresh available for
+        // search
+        dataContext = new ElasticSearchRestDataContext(client, indexName);
+        Thread.sleep(1000);
+        System.out.println("Embedded ElasticSearch server created!");
+    }
+
+    private static void insertPeopleDocuments() throws IOException {
+        indexOnePeopleDocument("female", 20, 5);
+        indexOnePeopleDocument("female", 17, 8);
+        indexOnePeopleDocument("female", 18, 9);
+        indexOnePeopleDocument("female", 19, 10);
+        indexOnePeopleDocument("female", 20, 11);
+        indexOnePeopleDocument("male", 19, 1);
+        indexOnePeopleDocument("male", 17, 2);
+        indexOnePeopleDocument("male", 18, 3);
+        indexOnePeopleDocument("male", 18, 4);
+    }
+
+    @AfterClass
+    public static void afterTests() {
+        embeddedElasticsearchServer.shutdown();
+        System.out.println("Embedded ElasticSearch server shut down!");
+    }
+
+    @Test
+    public void testSimpleQuery() throws Exception {
+        assertEquals("[bulktype, peopletype, tweet1, tweet2]",
+                
Arrays.toString(dataContext.getDefaultSchema().getTableNames()));
+
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+
+        assertEquals("[_id, message, postDate, user]", 
Arrays.toString(table.getColumnNames()));
+
+        assertEquals(ColumnType.STRING, 
table.getColumnByName("user").getType());
+        assertEquals(ColumnType.DATE, 
table.getColumnByName("postDate").getType());
+        assertEquals(ColumnType.BIGINT, 
table.getColumnByName("message").getType());
+
+        try (DataSet ds = 
dataContext.query().from(indexType1).select("user").and("message").execute()) {
+            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testDocumentIdAsPrimaryKey() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys();
+        assertEquals(1, pks.length);
+        assertEquals("_id", pks[0].getName());
+
+        try (DataSet ds = dataContext.query().from(table).select("user", 
"_id").orderBy("_id").asc().execute()) {
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, tweet_tweet2_1]]", 
ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testExecutePrimaryKeyLookupQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys();
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute())
 {
+            assertTrue(ds.next());
+            Object dateValue = ds.getRow().getValue(2);
+            assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", 
user1]]", ds.getRow().toString());
+
+            assertFalse(ds.next());
+
+            assertEquals(InMemoryDataSet.class, ds.getClass());
+        }
+    }
+
+    @Test
+    public void testDateIsHandledAsDate() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        Column column = table.getColumnByName("postDate");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.DATE, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Date (or 
subclass)", value instanceof Date);
+        }
+    }
+
+    @Test
+    public void testNumberIsHandledAsNumber() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Column column = table.getColumnByName("age");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.BIGINT, type);
+
+        DataSet dataSet = 
dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Number 
(or subclass)", value instanceof Number);
+        }
+    }
+
+    @Test
+    public void testCreateTableInsertQueryAndDrop() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        assertNotNull(table);
+        assertEquals("[" + ElasticSearchRestDataContext.FIELD_ID + ", foo, 
bar]", Arrays.toString(table.getColumnNames()));
+
+        final Column fooColumn = table.getColumnByName("foo");
+        final Column idColumn = table.getPrimaryKeys()[0];
+        
assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]",
+                idColumn.toString());
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+
+        try (DataSet ds = 
dataContext.query().from(table).selectAll().orderBy("bar").execute()) {
+            assertTrue(ds.next());
+            assertEquals("hello", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertTrue(ds.next());
+            assertEquals("world", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertFalse(ds.next());
+        }
+
+        dataContext.executeUpdate(new DropTable(table));
+
+        dataContext.refreshSchemas();
+
+        assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
+    }
+
+    @Test
+    public void testDetectOutsideChanges() throws Exception {
+        // Create the type in ES
+        final IndicesAdminClient indicesAdmin = 
embeddedElasticsearchServer.getClient().admin().indices();
+        final String tableType = "outsideTable";
+
+        Object[] sourceProperties = { "testA", "type=string, store=true", 
"testB", "type=string, store=true" };
+
+        new 
PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
+                .execute().actionGet();
+
+        dataContext.refreshSchemas();
+
+        
assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+
+        new 
DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
+        dataContext.refreshSchemas();
+        assertNull(dataContext.getTableByQualifiedLabel(tableType));
+    }
+
+    @Test
+    public void testDeleteAll() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new DeleteFrom(table));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext, 
dataContext.query().from(table).selectCount()
+                .toQuery());
+        assertEquals("Count is wrong", 0, ((Number) 
row.getValue(0)).intValue());
+
+        dataContext.executeUpdate(new DropTable(table));
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 
42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 
43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new 
DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
+                dataContext.query().from(table).select("foo", 
"bar").toQuery());
+        assertEquals("Row[values=[world, 43]]", row.toString());
+
+        dataContext.executeUpdate(new DropTable(table));
+    }
+
+    @Test
+    public void testDeleteUnsupportedQueryType() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        try {
+
+            dataContext.executeUpdate(new UpdateScript() {
+                @Override
+                public void run(UpdateCallback callback) {
+                    callback.insertInto(table).value("foo", 
"hello").value("bar", 42).execute();
+                    callback.insertInto(table).value("foo", 
"world").value("bar", 43).execute();
+                }
+            });
+
+            // greater than is not yet supported
+            try {
+                dataContext.executeUpdate(new 
DeleteFrom(table).where("bar").gt(40));
+                fail("Exception expected");
+            } catch (UnsupportedOperationException e) {
+                assertEquals("Could not push down WHERE items to delete by 
query request: [testCreateTable.bar > 40]",
+                        e.getMessage());
+            }
+
+        } finally {
+            dataContext.executeUpdate(new DropTable(table));
+        }
+    }
+
+    @Test
+    public void testUpdateRow() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, 
"testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        try {
+
+            dataContext.executeUpdate(new UpdateScript() {
+                @Override
+                public void run(UpdateCallback callback) {
+                    callback.insertInto(table).value("foo", 
"hello").value("bar", 42).execute();
+                    callback.insertInto(table).value("foo", 
"world").value("bar", 43).execute();
+                }
+            });
+
+            dataContext.executeUpdate(new Update(table).value("foo", 
"howdy").where("bar").eq(42));
+
+            DataSet dataSet = dataContext.query().from(table).select("foo", 
"bar").orderBy("bar").execute();
+            assertTrue(dataSet.next());
+            assertEquals("Row[values=[howdy, 42]]", 
dataSet.getRow().toString());
+            assertTrue(dataSet.next());
+            assertEquals("Row[values=[world, 43]]", 
dataSet.getRow().toString());
+            assertFalse(dataSet.next());
+            dataSet.close();
+        } finally {
+            dataContext.executeUpdate(new DropTable(table));
+        }
+    }
+
+    @Test
+    public void testDropTable() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        // assert that the table was there to begin with
+        {
+            DataSet ds = 
dataContext.query().from(table).selectCount().execute();
+            ds.next();
+            assertEquals("Count is wrong", 9, ((Number) 
ds.getRow().getValue(0)).intValue());
+            ds.close();
+        }
+
+        dataContext.executeUpdate(new DropTable(table));
+        try {
+            DataSet ds = 
dataContext.query().from(table).selectCount().execute();
+            ds.next();
+            assertEquals("Count is wrong", 0, ((Number) 
ds.getRow().getValue(0)).intValue());
+            ds.close();
+        } finally {
+            // restore the people documents for the next tests
+            insertPeopleDocuments();
+            
embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
+            dataContext = new ElasticSearchRestDataContext(client, indexName);
+        }
+    }
+
+    @Test
+    public void testWhereColumnEqualsValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").execute()) {
+            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNullValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNull().execute()) {
+            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[2]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNotNullValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNotNull().execute()) {
+            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[1]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereMultiColumnsEqualValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").and("message").ne(5).execute()) {
+            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnInValues() throws Exception {
+        try (DataSet ds = 
dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .in("user4", "user5").orderBy("message").execute()) {
+            assertTrue(ds.next());
+
+            String row1 = ds.getRow().toString();
+            assertEquals("Row[values=[user4, 4]]", row1);
+            assertTrue(ds.next());
+
+            String row2 = ds.getRow().toString();
+            assertEquals("Row[values=[user5, 5]]", row2);
+
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testGroupByQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        Query q = new Query();
+        q.from(table);
+        q.groupBy(table.getColumnByName("gender"));
+        q.select(new SelectItem(table.getColumnByName("gender")),
+                new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
+                new SelectItem(FunctionType.MIN, 
table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
+                        "total"), new SelectItem(FunctionType.MIN, 
table.getColumnByName("id")).setAlias("firstId"));
+        q.orderBy("gender");
+        DataSet data = dataContext.executeQuery(q);
+        assertEquals(
+                "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), 
COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
+                Arrays.toString(data.getSelectItems()));
+
+        assertTrue(data.next());
+        assertEquals("Row[values=[female, 20, 17, 5, 5]]", 
data.getRow().toString());
+        assertTrue(data.next());
+        assertEquals("Row[values=[male, 19, 17, 4, 1]]", 
data.getRow().toString());
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testFilterOnNumberColumn() {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = 
dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
+        DataSet data = dataContext.executeQuery(q);
+        String[] expectations = new String[] { "Row[values=[user8]]", 
"Row[values=[user9]]" };
+
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertTrue(data.next());
+        
assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testMaxRows() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Query query = new 
Query().from(table).select(table.getColumns()).setMaxRows(5);
+        DataSet dataSet = dataContext.executeQuery(query);
+
+        TableModel tableModel = new DataSetTableModel(dataSet);
+        assertEquals(5, tableModel.getRowCount());
+    }
+
+    @Test
+    public void testCountQuery() throws Exception {
+        Table table = 
dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = new Query().selectCount().from(table);
+
+        List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
+        assertEquals(1, data.size());
+        Object[] row = data.get(0);
+        assertEquals(1, row.length);
+        assertEquals(10, ((Number) row[0]).intValue());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForANonExistingTable() throws Exception {
+        
dataContext.query().from("nonExistingTable").select("user").and("message").execute();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForAnExistingTableAndNonExistingField() throws 
Exception {
+        indexTweeterDocument(indexType1, 1);
+        
dataContext.query().from(indexType1).select("nonExistingField").execute();
+    }
+
+    @Test
+    public void testNonDynamicMapingTableNames() throws Exception {
+        createIndex();
+
+        ElasticSearchRestDataContext dataContext2 = new 
ElasticSearchRestDataContext(client, indexName2);
+
+        assertEquals("[tweet3]", 
Arrays.toString(dataContext2.getDefaultSchema().getTableNames()));
+    }
+
+    private static void createIndex() {
+        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+        CreateIndexResponse response =
+                
embeddedElasticsearchServer.getClient().admin().indices().create(cir).actionGet();
+
+        System.out.println("create index: " + response.isAcknowledged());
+
+        PutMappingRequest pmr = new 
PutMappingRequest(indexName2).type(indexType3).source(mapping);
+
+        PutMappingResponse response2 =
+                
embeddedElasticsearchServer.getClient().admin().indices().putMapping(pmr).actionGet();
+        System.out.println("put mapping: " + response2.isAcknowledged());
+    }
+
+    private static void indexBulkDocuments(String indexName, String indexType, 
int numberOfDocuments) {
+        BulkRequestBuilder bulkRequest = 
embeddedElasticsearchServer.getClient().prepareBulk();
+
+        for (int i = 0; i < numberOfDocuments; i++) {
+            
bulkRequest.add(embeddedElasticsearchServer.getClient().prepareIndex(indexName, 
indexType,
+                    Integer.toString(i)).setSource(
+                    buildTweeterJson(i)));
+        }
+        bulkRequest.execute().actionGet();
+    }
+
+    private static void indexTweeterDocument(String indexType, int id, Date 
date) {
+        embeddedElasticsearchServer.getClient().prepareIndex(indexName, 
indexType).setSource(buildTweeterJson(id, date))
+                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    }
+
+    private static void indexTweeterDocument(String indexType, int id) {
+        embeddedElasticsearchServer.getClient().prepareIndex(indexName, 
indexType).setSource(buildTweeterJson(id))
+                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    }
+
+    private static void indexOnePeopleDocument(String gender, int age, int id) 
throws IOException {
+        embeddedElasticsearchServer.getClient().prepareIndex(indexName, 
peopleIndexType)
+                .setSource(buildPeopleJson(gender, age, id)).execute()
+                .actionGet();
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId) {
+        return buildTweeterJson(elementId, new Date());
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId, Date 
date) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("user", "user" + elementId);
+        map.put("postDate", date);
+        map.put("message", elementId);
+        return map;
+    }
+
+    private static XContentBuilder buildPeopleJson(String gender, int age, int 
elementId) throws IOException {
+        return jsonBuilder().startObject().field("gender", 
gender).field("age", age).field("id", elementId).endObject();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
new file mode 100644
index 0000000..6eeac6a
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.schema.ColumnType;
+import org.elasticsearch.common.collect.MapBuilder;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+public class JestElasticSearchMetaDataParserTest extends TestCase {
+
+    public void testParseMetadataInfo() throws Exception {
+        Map<String, Object> metadata = new LinkedHashMap<>();
+        metadata.put("message", MapBuilder.newMapBuilder().put("type", 
"long").immutableMap());
+        metadata.put("postDate", MapBuilder.newMapBuilder().put("type", 
"date").put("format", "dateOptionalTime").immutableMap());
+        metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", 
"date").put("format", "dateOptionalTime").immutableMap());
+        metadata.put("user", MapBuilder.newMapBuilder().put("type", 
"string").immutableMap());
+        metadata.put("critical", MapBuilder.newMapBuilder().put("type", 
"boolean").immutableMap());
+        metadata.put("income", MapBuilder.newMapBuilder().put("type", 
"double").immutableMap());
+        metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", 
"bar").immutableMap());
+        final Gson gson = new Gson();
+        ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser
+                .parse((JsonObject) gson.toJsonTree(metadata));
+        String[] columnNames = metaData.getColumnNames();
+        ColumnType[] columnTypes = metaData.getColumnTypes();
+
+        assertTrue(columnNames.length == 8);
+        assertEquals(columnNames[0], "_id");
+        assertEquals(columnNames[1], "message");
+        assertEquals(columnNames[2], "postDate");
+        assertEquals(columnNames[3], "anotherDate");
+        assertEquals(columnNames[4], "user");
+        assertEquals(columnNames[5], "critical");
+        assertEquals(columnNames[6], "income");
+        assertEquals(columnNames[7], "untypedthingie");
+        
+        assertTrue(columnTypes.length == 8);
+        assertEquals(columnTypes[0], ColumnType.STRING);
+        assertEquals(columnTypes[1], ColumnType.BIGINT);
+        assertEquals(columnTypes[2], ColumnType.DATE);
+        assertEquals(columnTypes[3], ColumnType.DATE);
+        assertEquals(columnTypes[4], ColumnType.STRING);
+        assertEquals(columnTypes[5], ColumnType.BOOLEAN);
+        assertEquals(columnTypes[6], ColumnType.DOUBLE);
+        assertEquals(columnTypes[7], ColumnType.STRING);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
new file mode 100644
index 0000000..0d78d8e
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import com.google.gson.JsonObject;
+import junit.framework.TestCase;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+public class JestElasticSearchUtilsTest extends TestCase {
+
+    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
+        MutableColumn primaryKeyColumn = new MutableColumn("value1", 
ColumnType.STRING).setPrimaryKey(true);
+        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
+        List<SelectItem> selectItems1 = 
Collections.singletonList(primaryKeyItem);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        JsonObject values = new JsonObject();
+
+        values.addProperty("value1", "theValue");
+        Row row = JestElasticSearchUtils.createRow(values, documentId, header);
+        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
+
+        assertEquals(primaryKeyValue, documentId);
+    }
+
+    public void testCreateRowWithParseableDates() throws Exception {
+        SelectItem item1 = new SelectItem(new MutableColumn("value1", 
ColumnType.STRING));
+        SelectItem item2 = new SelectItem(new MutableColumn("value2", 
ColumnType.DATE));
+        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        JsonObject values = new JsonObject();
+        values.addProperty("value1", "theValue");
+        values.addProperty("value2", "2013-01-04T15:55:51.217+01:00");
+        Row row = JestElasticSearchUtils.createRow(values, documentId, header);
+        Object stringValue = row.getValue(item1);
+        Object dateValue = row.getValue(item2);
+
+        assertTrue(stringValue instanceof String);
+        assertTrue(dateValue instanceof Date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
new file mode 100644
index 0000000..11e7eb5
--- /dev/null
+++ 
b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+public class EmbeddedElasticsearchServer {
+
+    private static final String DEFAULT_DATA_DIRECTORY = 
"target/elasticsearch-data";
+
+    private final Node node;
+    private final String dataDirectory;
+
+    public EmbeddedElasticsearchServer() {
+        this(DEFAULT_DATA_DIRECTORY);
+    }
+
+    public EmbeddedElasticsearchServer(String dataDirectory) {
+        this.dataDirectory = dataDirectory;
+
+        ImmutableSettings.Builder elasticsearchSettings = 
ImmutableSettings.settingsBuilder()
+                .put("http.enabled", "true")
+                .put("path.data", dataDirectory)
+                .put("http.port", 9292);
+
+        node = nodeBuilder()
+                .local(true)
+                .settings(elasticsearchSettings.build())
+                .node();
+    }
+
+    public Client getClient() {
+        return node.client();
+    }
+
+    public void shutdown() {
+        node.close();
+        deleteDataDirectory();
+    }
+
+    private void deleteDataDirectory() {
+        try {
+            FileUtils.deleteDirectory(new File(dataDirectory));
+        } catch (IOException e) {
+            throw new RuntimeException("Could not delete data directory of 
embedded elasticsearch server", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java
 
b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java
deleted file mode 100644
index 0a1750d..0000000
--- 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.create.AbstractTableCreationBuilder;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.common.base.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class ElasticSearchCreateTableBuilder extends 
AbstractTableCreationBuilder<ElasticSearchUpdateCallback> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchCreateTableBuilder.class);
-
-    public ElasticSearchCreateTableBuilder(ElasticSearchUpdateCallback 
updateCallback, Schema schema, String name) {
-        super(updateCallback, schema, name);
-    }
-
-    @Override
-    public Table execute() throws MetaModelException {
-        final MutableTable table = getTable();
-
-        if (table.getColumnByName(ElasticSearchDataContext.FIELD_ID) == null) {
-            final MutableColumn idColumn = new 
MutableColumn(ElasticSearchDataContext.FIELD_ID, ColumnType.STRING)
-                    .setTable(table).setPrimaryKey(true);
-            table.addColumn(0, idColumn);
-        }
-
-        final ElasticSearchDataContext dataContext = 
getUpdateCallback().getDataContext();
-        final IndicesAdminClient indicesAdmin = 
dataContext.getElasticSearchClient().admin().indices();
-        final String indexName = dataContext.getIndexName();
-
-        final List<Object> sourceProperties = new ArrayList<>();
-        for (Column column : table.getColumns()) {
-            // each column is defined as a property pair of the form: 
("field1",
-            // "type=string,store=true")
-            final String columnName = column.getName();
-            if (ElasticSearchDataContext.FIELD_ID.equals(columnName)) {
-                // do nothing - the ID is a client-side construct
-                continue;
-            }
-            sourceProperties.add(columnName);
-
-            String type = getType(column);
-            if (type == null) {
-                sourceProperties.add("store=true");
-            } else {
-                sourceProperties.add("type=" + type + ",store=true");
-            }
-        }
-
-        final PutMappingRequestBuilder requestBuilder = new 
PutMappingRequestBuilder(indicesAdmin)
-                .setIndices(indexName).setType(table.getName());
-        requestBuilder.setSource(sourceProperties.toArray());
-        final PutMappingResponse result = requestBuilder.execute().actionGet();
-
-        logger.debug("PutMapping response: acknowledged={}", 
result.isAcknowledged());
-
-        final MutableSchema schema = (MutableSchema) getSchema();
-        schema.addTable(table);
-        return table;
-    }
-
-    /**
-     * Determines the best fitting type. For reference of ElasticSearch types,
-     * see
-     * 
-     * <pre>
-     * 
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html
-     * </pre>
-     * 
-     * 
-     * @param column
-     * @return
-     */
-    private String getType(Column column) {
-        String nativeType = column.getNativeType();
-        if (!Strings.isNullOrEmpty(nativeType)) {
-            return nativeType;
-        }
-
-        final ColumnType type = column.getType();
-        if (type == null) {
-            throw new IllegalStateException("No column type specified for '" + 
column.getName()
-                    + "' - cannot build ElasticSearch mapping without type.");
-        }
-
-        if (type.isLiteral()) {
-            return "string";
-        } else if (type == ColumnType.FLOAT) {
-            return "float";
-        } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || 
type == ColumnType.NUMBER) {
-            return "double";
-        } else if (type == ColumnType.SMALLINT) {
-            return "short";
-        } else if (type == ColumnType.TINYINT) {
-            return "byte";
-        } else if (type == ColumnType.INTEGER) {
-            return "integer";
-        } else if (type == ColumnType.DATE || type == ColumnType.TIMESTAMP) {
-            return "date";
-        } else if (type == ColumnType.BINARY || type == ColumnType.VARBINARY) {
-            return "binary";
-        } else if (type == ColumnType.BOOLEAN || type == ColumnType.BIT) {
-            return "boolean";
-        } else if (type == ColumnType.MAP) {
-            return "object";
-        }
-
-        throw new UnsupportedOperationException("Unsupported column type '" + 
type.getName() + "' of column '"
-                + column.getName() + "' - cannot translate to an ElasticSearch 
type.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
 
b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
deleted file mode 100644
index 34bb983..0000000
--- 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.metamodel.DataContext;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
-import org.apache.metamodel.UpdateScript;
-import org.apache.metamodel.UpdateableDataContext;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.FilterItem;
-import org.apache.metamodel.query.LogicalOperator;
-import org.apache.metamodel.query.OperatorType;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import org.apache.metamodel.util.CollectionUtils;
-import org.apache.metamodel.util.SimpleTableDef;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectLookupContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DataContext implementation for ElasticSearch analytics engine.
- *
- * ElasticSearch has a data storage structure hierarchy that briefly goes like
- * this:
- * <ul>
- * <li>Index</li>
- * <li>Document type (short: Type) (within an index)</li>
- * <li>Documents (of a particular type)</li>
- * </ul>
- *
- * When instantiating this DataContext, an index name is provided. Within this
- * index, each document type is represented as a table.
- *
- * This implementation supports either automatic discovery of a schema or 
manual
- * specification of a schema, through the {@link SimpleTableDef} class.
- */
-public class ElasticSearchDataContext extends QueryPostprocessDataContext 
implements DataContext, UpdateableDataContext {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDataContext.class);
-
-    public static final String FIELD_ID = "_id";
-
-    public static final TimeValue TIMEOUT_SCROLL = 
TimeValue.timeValueSeconds(60);
-
-    private final Client elasticSearchClient;
-    private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be 
changed.
-    private final List<SimpleTableDef> staticTableDefinitions;
-
-    // Table definitions that are discovered, these can change
-    private final List<SimpleTableDef> dynamicTableDefinitions = new 
ArrayList<>();
-
-    /**
-     * Constructs a {@link ElasticSearchDataContext}. This constructor accepts 
a
-     * custom array of {@link SimpleTableDef}s which allows the user to define
-     * his own view on the indexes in the engine.
-     *
-     * @param client
-     *            the ElasticSearch client
-     * @param indexName
-     *            the name of the ElasticSearch index to represent
-     * @param tableDefinitions
-     *            an array of {@link SimpleTableDef}s, which define the table
-     *            and column model of the ElasticSearch index.
-     */
-    public ElasticSearchDataContext(Client client, String indexName, 
SimpleTableDef... tableDefinitions) {
-        if (client == null) {
-            throw new IllegalArgumentException("ElasticSearch Client cannot be 
null");
-        }
-        if (indexName == null || indexName.trim().length() == 0) {
-            throw new IllegalArgumentException("Invalid ElasticSearch Index 
name: " + indexName);
-        }
-        this.elasticSearchClient = client;
-        this.indexName = indexName;
-        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
-        this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
-    }
-
-    /**
-     * Constructs a {@link ElasticSearchDataContext} and automatically detects
-     * the schema structure/view on all indexes (see
-     * {@link this.detectSchema(Client, String)}).
-     *
-     * @param client
-     *            the ElasticSearch client
-     * @param indexName
-     *            the name of the ElasticSearch index to represent
-     */
-    public ElasticSearchDataContext(Client client, String indexName) {
-        this(client, indexName, new SimpleTableDef[0]);
-    }
-
-    /**
-     * Performs an analysis of the available indexes in an ElasticSearch 
cluster
-     * {@link Client} instance and detects the elasticsearch types structure
-     * based on the metadata provided by the ElasticSearch java client.
-     *
-     * @see #detectTable(ClusterState, String, String)
-     * @return a mutable schema instance, useful for further fine tuning by the
-     *         user.
-     */
-    private SimpleTableDef[] detectSchema() {
-        logger.info("Detecting schema for index '{}'", indexName);
-
-        final ClusterState cs;
-        final ClusterStateRequestBuilder clusterStateRequestBuilder =
-                getElasticSearchClient().admin().cluster().prepareState();
-
-        // different methods here to set the index name, so we have to use
-        // reflection :-/
-        try {
-            final byte majorVersion = Version.CURRENT.major;
-            final Object methodArgument = new String[] { indexName };
-            if (majorVersion == 0) {
-                final Method method = 
ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            } else {
-                final Method method = 
ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            }
-        } catch (Exception e) {
-            logger.error("Failed to set index name on 
ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
-            throw new MetaModelException("Failed to create request for index 
information needed to detect schema", e);
-        }
-        cs = clusterStateRequestBuilder.execute().actionGet().getState();
-
-        final List<SimpleTableDef> result = new ArrayList<>();
-
-        final IndexMetaData imd = cs.getMetaData().index(indexName);
-        if (imd == null) {
-            // index does not exist
-            logger.warn("No metadata returned for index name '{}' - no tables 
will be detected.");
-        } else {
-            final ImmutableOpenMap<String, MappingMetaData> mappings = 
imd.getMappings();
-            final ObjectLookupContainer<String> documentTypes = 
mappings.keys();
-
-            for (final Object documentTypeCursor : documentTypes) {
-                final String documentType = ((ObjectCursor<?>) 
documentTypeCursor).value.toString();
-                try {
-                    final SimpleTableDef table = detectTable(cs, indexName, 
documentType);
-                    result.add(table);
-                } catch (Exception e) {
-                    logger.error("Unexpected error during detectTable for 
document type '{}'", documentType, e);
-                }
-            }
-        }
-        final SimpleTableDef[] tableDefArray = result.toArray(new 
SimpleTableDef[result.size()]);
-        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
-            @Override
-            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
-
-        return tableDefArray;
-    }
-
-    /**
-     * Performs an analysis of an available index type in an ElasticSearch
-     * {@link Client} client and tries to detect the index structure based on
-     * the metadata provided by the java client.
-     *
-     * @param cs
-     *            the ElasticSearch cluster
-     * @param indexName
-     *            the name of the index
-     * @param documentType
-     *            the name of the index type
-     * @return a table definition for ElasticSearch.
-     */
-    public static SimpleTableDef detectTable(ClusterState cs, String 
indexName, String documentType) throws Exception {
-        logger.debug("Detecting table for document type '{}' in index '{}'", 
documentType, indexName);
-        final IndexMetaData imd = cs.getMetaData().index(indexName);
-        if (imd == null) {
-            // index does not exist
-            throw new IllegalArgumentException("No such index: " + indexName);
-        }
-        final MappingMetaData mappingMetaData = imd.mapping(documentType);
-        if (mappingMetaData == null) {
-            throw new IllegalArgumentException("No such document type in index 
'" + indexName + "': " + documentType);
-        }
-        final Map<String, Object> mp = mappingMetaData.getSourceAsMap();
-        final Object metadataProperties = mp.get("properties");
-        if (metadataProperties != null && metadataProperties instanceof Map) {
-            @SuppressWarnings("unchecked")
-            final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) 
metadataProperties;
-            final ElasticSearchMetaData metaData = 
ElasticSearchMetaDataParser.parse(metadataPropertiesMap);
-            final SimpleTableDef std = new SimpleTableDef(documentType, 
metaData.getColumnNames(),
-                    metaData.getColumnTypes());
-            return std;
-        }
-        throw new IllegalArgumentException("No mapping properties defined for 
document type '" + documentType
-                + "' in index: " + indexName);
-    }
-
-    @Override
-    protected Schema getMainSchema() throws MetaModelException {
-        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
-        for (final SimpleTableDef tableDef : staticTableDefinitions) {
-            addTable(theSchema, tableDef);
-        }
-
-        final SimpleTableDef[] tables = detectSchema();
-        synchronized (this) {
-            dynamicTableDefinitions.clear();
-            dynamicTableDefinitions.addAll(Arrays.asList(tables));
-            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
-                final List<String> tableNames = 
Arrays.asList(theSchema.getTableNames());
-
-                if (!tableNames.contains(tableDef.getName())) {
-                    addTable(theSchema, tableDef);
-                }
-            }
-        }
-
-        return theSchema;
-    }
-
-    private void addTable(final MutableSchema theSchema, final SimpleTableDef 
tableDef) {
-        final MutableTable table = tableDef.toTable().setSchema(theSchema);
-        final Column idColumn = table.getColumnByName(FIELD_ID);
-        if (idColumn != null && idColumn instanceof MutableColumn) {
-            final MutableColumn mutableColumn = (MutableColumn) idColumn;
-            mutableColumn.setPrimaryKey(true);
-        }
-        theSchema.addTable(table);
-    }
-
-    @Override
-    protected String getMainSchemaName() throws MetaModelException {
-        return indexName;
-    }
-
-    @Override
-    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> 
selectItems,
-            List<FilterItem> whereItems, int firstRow, int maxRows) {
-        final QueryBuilder queryBuilder = 
createQueryBuilderForSimpleWhere(table, whereItems, LogicalOperator.AND);
-        if (queryBuilder != null) {
-            // where clause can be pushed down to an ElasticSearch query
-            final SearchRequestBuilder searchRequest = 
createSearchRequest(table, firstRow, maxRows, queryBuilder);
-            final SearchResponse response = 
searchRequest.execute().actionGet();
-            return new ElasticSearchDataSet(elasticSearchClient, response, 
selectItems, false);
-        }
-        return super.materializeMainSchemaTable(table, selectItems, 
whereItems, firstRow, maxRows);
-    }
-
-    @Override
-    protected DataSet materializeMainSchemaTable(Table table, Column[] 
columns, int maxRows) {
-        final SearchRequestBuilder searchRequest = createSearchRequest(table, 
1, maxRows, null);
-        final SearchResponse response = searchRequest.execute().actionGet();
-        return new ElasticSearchDataSet(elasticSearchClient, response, 
columns, false);
-    }
-
-    private SearchRequestBuilder createSearchRequest(Table table, int 
firstRow, int maxRows, QueryBuilder queryBuilder) {
-        final String documentType = table.getName();
-        final SearchRequestBuilder searchRequest = 
elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
-        if (firstRow > 1) {
-            final int zeroBasedFrom = firstRow - 1;
-            searchRequest.setFrom(zeroBasedFrom);
-        }
-        if (limitMaxRowsIsSet(maxRows)) {
-            searchRequest.setSize(maxRows);
-        } else {
-            searchRequest.setScroll(TIMEOUT_SCROLL);
-        }
-
-        if (queryBuilder != null) {
-            searchRequest.setQuery(queryBuilder);
-        }
-
-        return searchRequest;
-    }
-
-    /**
-     * Creates, if possible, a {@link QueryBuilder} object which can be used to
-     * push down one or more {@link FilterItem}s to ElasticSearch's backend.
-     *
-     * @param table
-     * @param whereItems
-     * @param logicalOperator
-     * @return a {@link QueryBuilder} if one was produced, or null if the items
-     *         could not be pushed down to an ElasticSearch query
-     */
-    protected QueryBuilder createQueryBuilderForSimpleWhere(Table table, 
List<FilterItem> whereItems,
-            LogicalOperator logicalOperator) {
-        if (whereItems.isEmpty()) {
-            return QueryBuilders.matchAllQuery();
-        }
-
-        List<QueryBuilder> children = new ArrayList<>(whereItems.size());
-        for (FilterItem item : whereItems) {
-            final QueryBuilder itemQueryBuilder;
-
-            if (item.isCompoundFilter()) {
-                final List<FilterItem> childItems = 
Arrays.asList(item.getChildItems());
-                itemQueryBuilder = createQueryBuilderForSimpleWhere(table, 
childItems, item.getLogicalOperator());
-                if (itemQueryBuilder == null) {
-                    // something was not supported, so we have to forfeit here
-                    // too.
-                    return null;
-                }
-            } else {
-                final Column column = item.getSelectItem().getColumn();
-                if (column == null) {
-                    // unsupport type of where item - must have a column
-                    // reference
-                    return null;
-                }
-                final String fieldName = column.getName();
-                final Object operand = item.getOperand();
-                final OperatorType operator = item.getOperator();
-
-                if (OperatorType.EQUALS_TO.equals(operator)) {
-                    if (operand == null) {
-                        itemQueryBuilder = QueryBuilders.filteredQuery(null, 
FilterBuilders.missingFilter(fieldName));
-                    } else {
-                        itemQueryBuilder = QueryBuilders.termQuery(fieldName, 
operand);
-                    }
-                } else if (OperatorType.DIFFERENT_FROM.equals(operator)) {
-                    if (operand == null) {
-                        itemQueryBuilder = QueryBuilders.filteredQuery(null, 
FilterBuilders.existsFilter(fieldName));
-                    } else {
-                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(
-                                QueryBuilders.termQuery(fieldName, operand));
-                    }
-                } else if (OperatorType.IN.equals(operator)) {
-                    final List<?> operands = CollectionUtils.toList(operand);
-                    itemQueryBuilder = QueryBuilders.termsQuery(fieldName, 
operands);
-                } else {
-                    // not (yet) support operator types
-                    return null;
-                }
-            }
-
-            children.add(itemQueryBuilder);
-        }
-
-        // just one where item - just return the child query builder
-        if (children.size() == 1) {
-            return children.get(0);
-        }
-
-        // build a bool query
-        final BoolQueryBuilder result = QueryBuilders.boolQuery();
-        for (QueryBuilder child : children) {
-            switch (logicalOperator) {
-            case AND:
-                result.must(child);
-            case OR:
-                result.should(child);
-            }
-        }
-
-        return result;
-    }
-
-    @Override
-    protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> 
selectItems, Column primaryKeyColumn,
-            Object keyValue) {
-        if (keyValue == null) {
-            return null;
-        }
-
-        final String documentType = table.getName();
-        final String id = keyValue.toString();
-
-        final GetResponse response = elasticSearchClient.prepareGet(indexName, 
documentType, id).execute().actionGet();
-
-        if (!response.isExists()) {
-            return null;
-        }
-
-        final Map<String, Object> source = response.getSource();
-        final String documentId = response.getId();
-
-        final DataSetHeader header = new SimpleDataSetHeader(selectItems);
-
-        return ElasticSearchUtils.createRow(source, documentId, header);
-    }
-
-    @Override
-    protected Number executeCountQuery(Table table, List<FilterItem> 
whereItems, boolean functionApproximationAllowed) {
-        if (!whereItems.isEmpty()) {
-            // not supported - will have to be done by counting client-side
-            return null;
-        }
-        final String documentType = table.getName();
-        final CountResponse response = 
elasticSearchClient.prepareCount(indexName)
-                .setQuery(QueryBuilders.termQuery("_type", 
documentType)).execute().actionGet();
-        return response.getCount();
-    }
-
-    private boolean limitMaxRowsIsSet(int maxRows) {
-        return (maxRows != -1);
-    }
-
-    @Override
-    public void executeUpdate(UpdateScript update) {
-        final ElasticSearchUpdateCallback callback = new 
ElasticSearchUpdateCallback(this);
-        update.run(callback);
-        callback.onExecuteUpdateFinished();
-    }
-
-    /**
-     * Gets the {@link Client} that this {@link DataContext} is wrapping.
-     *
-     * @return
-     */
-    public Client getElasticSearchClient() {
-        return elasticSearchClient;
-    }
-
-    /**
-     * Gets the name of the index that this {@link DataContext} is working on.
-     *
-     * @return
-     */
-    public String getIndexName() {
-        return indexName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
 
b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
deleted file mode 100644
index 8c41524..0000000
--- 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.metamodel.data.AbstractDataSet;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.elasticsearch.action.search.ClearScrollRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link DataSet} implementation for ElasticSearch
- */
-final class ElasticSearchDataSet extends AbstractDataSet {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDataSet.class);
-
-    private final Client _client;
-    private final AtomicBoolean _closed;
-
-    private SearchResponse _searchResponse;
-    private SearchHit _currentHit;
-    private int _hitIndex = 0;
-
-    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, 
List<SelectItem> selectItems,
-            boolean queryPostProcessed) {
-        super(selectItems);
-        _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
-    }
-    
-    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, 
Column[] columns,
-            boolean queryPostProcessed) {
-        super(columns);
-        _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        boolean closeNow = _closed.compareAndSet(true, false);
-        if (closeNow) {
-            ClearScrollRequestBuilder scrollRequestBuilder = new 
ClearScrollRequestBuilder(_client)
-                    .addScrollId(_searchResponse.getScrollId());
-            scrollRequestBuilder.execute();
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        if (!_closed.get()) {
-            logger.warn("finalize() invoked, but DataSet is not closed. 
Invoking close() on {}", this);
-            close();
-        }
-    }
-
-    @Override
-    public boolean next() {
-        final SearchHit[] hits = _searchResponse.getHits().hits();
-        if (hits.length == 0) {
-            // break condition for the scroll
-            _currentHit = null;
-            return false;
-        }
-
-        if (_hitIndex < hits.length) {
-            // pick the next hit within this search response
-            _currentHit = hits[_hitIndex];
-            _hitIndex++;
-            return true;
-        }
-
-        final String scrollId = _searchResponse.getScrollId();
-        if (scrollId == null) {
-            // this search response is not scrolleable - then it's the end.
-            _currentHit = null;
-            return false;
-        }
-
-        // try to scroll to the next set of hits
-        _searchResponse = 
_client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
-                .execute().actionGet();
-
-        // start over (recursively)
-        _hitIndex = 0;
-        return next();
-    }
-
-    @Override
-    public Row getRow() {
-        if (_currentHit == null) {
-            return null;
-        }
-
-        final Map<String, Object> source = _currentHit.getSource();
-        final String documentId = _currentHit.getId();
-        final Row row = ElasticSearchUtils.createRow(source, documentId, 
getHeader());
-        return row;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java
 
b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java
deleted file mode 100644
index fcf0b24..0000000
--- 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch;
-
-import org.apache.metamodel.util.TimeComparator;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * Util class to convert date strings from ElasticSearch to
- * proper java Dates.
- */
-final class ElasticSearchDateConverter {
-
-    public static Date tryToConvert(String dateAsString) {
-        try {
-            SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
-            return dateFormat.parse(dateAsString);
-        } catch (ParseException e) {
-            return TimeComparator.toDate(dateAsString);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java
 
b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java
deleted file mode 100644
index b1fdf6e..0000000
--- 
a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch;
-
-import java.util.List;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.query.FilterItem;
-import org.apache.metamodel.query.LogicalOperator;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link RowDeletionBuilder} implementation for
- * {@link ElasticSearchDataContext}.
- */
-final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ElasticSearchDeleteBuilder.class);
-
-    private final ElasticSearchUpdateCallback _updateCallback;
-
-    public ElasticSearchDeleteBuilder(ElasticSearchUpdateCallback 
updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final Table table = getTable();
-        final String documentType = table.getName();
-
-        final ElasticSearchDataContext dataContext = 
_updateCallback.getDataContext();
-        final Client client = dataContext.getElasticSearchClient();
-        final String indexName = dataContext.getIndexName();
-
-        final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new 
DeleteByQueryRequestBuilder(client);
-        deleteByQueryRequestBuilder.setIndices(indexName);
-        deleteByQueryRequestBuilder.setTypes(documentType);
-
-        final List<FilterItem> whereItems = getWhereItems();
-
-        // delete by query - note that creteQueryBuilderForSimpleWhere may
-        // return matchAllQuery() if no where items are present.
-        final QueryBuilder queryBuilder = 
dataContext.createQueryBuilderForSimpleWhere(table, whereItems,
-                LogicalOperator.AND);
-        if (queryBuilder == null) {
-            // TODO: The where items could not be pushed down to a query. We
-            // could solve this by running a query first, gather all
-            // document IDs and then delete by IDs.
-            throw new UnsupportedOperationException("Could not push down WHERE 
items to delete by query request: "
-                    + whereItems);
-        }
-        deleteByQueryRequestBuilder.setQuery(queryBuilder);
-        deleteByQueryRequestBuilder.execute().actionGet();
-
-        logger.debug("Deleted documents by query.");
-    }
-}

Reply via email to