http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java new file mode 100644 index 0000000..ca2ed13 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.indices.Refresh; +import org.apache.metamodel.AbstractUpdateCallback; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.create.TableCreationBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.insert.RowInsertionBuilder; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; + +/** + * {@link UpdateCallback} implementation for {@link ElasticSearchRestDataContext}. + */ +final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback { + public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext) { + super(dataContext); + } + + @Override + public ElasticSearchRestDataContext getDataContext() { + return (ElasticSearchRestDataContext) super.getDataContext(); + } + + @Override + public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException, + IllegalStateException { + return new JestElasticSearchCreateTableBuilder(this, schema, name); + } + + @Override + public boolean isDropTableSupported() { + return true; + } + + @Override + public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new JestElasticSearchDropTableBuilder(this, table); + } + + @Override + public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new JestElasticSearchInsertBuilder(this, table); + } + + @Override + public boolean isDeleteSupported() { + return true; + } + + @Override + public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new JestElasticSearchDeleteBuilder(this, table); + } + + public void onExecuteUpdateFinished() { + final String indexName = getDataContext().getIndexName(); + Refresh refresh = new Refresh.Builder().addIndex(indexName).build(); + + JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false); + } +}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java new file mode 100644 index 0000000..c37ff80 --- /dev/null +++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.DefaultRow; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.util.NumberComparator; + +import java.util.Date; + +/** + * Shared/common util functions for the ElasticSearch MetaModel module. + */ +final class JestElasticSearchUtils { + public static Row createRow(JsonObject source, String documentId, DataSetHeader header) { + final Object[] values = new Object[header.size()]; + for (int i = 0; i < values.length; i++) { + final SelectItem selectItem = header.getSelectItem(i); + final Column column = selectItem.getColumn(); + + assert column != null; + assert !selectItem.hasFunction(); + + if (column.isPrimaryKey()) { + values[i] = documentId; + } else { + values[i] = getDataFromColumnType(source.get(column.getName()), column.getType()); + } + } + + return new DefaultRow(header, values); + } + + private static Object getDataFromColumnType(JsonElement field, ColumnType type) { + if (type.isNumber()) { + // Pretty terrible workaround to avoid LazilyParsedNumber + // (which is happily output, but not recognized by Jest/GSON). + return NumberComparator.toNumber(field.getAsString()); + } else if (type.isTimeBased()) { + Date valueToDate = ElasticSearchDateConverter.tryToConvert(field.getAsString()); + if (valueToDate == null) { + return field.getAsString(); + } else { + return valueToDate; + } + } else if (type.isBoolean()) { + return field.getAsBoolean(); + } else { + return field.getAsString(); + } + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java new file mode 100644 index 0000000..aac963a --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import io.searchbox.client.JestClient; +import io.searchbox.client.JestClientFactory; +import io.searchbox.client.config.HttpClientConfig; +import org.apache.metamodel.MetaModelHelper; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateableDataContext; +import org.apache.metamodel.create.CreateTable; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.DataSetTableModel; +import org.apache.metamodel.data.InMemoryDataSet; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.delete.DeleteFrom; +import org.apache.metamodel.drop.DropTable; +import org.apache.metamodel.elasticsearch.rest.utils.EmbeddedElasticsearchServer; +import org.apache.metamodel.query.FunctionType; +import org.apache.metamodel.query.Query; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.update.Update; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.swing.table.TableModel; +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.*; + +public class JestElasticSearchDataContextTest { + + private static final String indexName = "twitter"; + private static final String indexType1 = "tweet1"; + private static final String indexType2 = "tweet2"; + private static final String indexName2 = "twitter2"; + private static final String indexType3 = "tweet3"; + private static final String bulkIndexType = "bulktype"; + private static final String peopleIndexType = "peopletype"; + private static final String mapping = + "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}"; + private static EmbeddedElasticsearchServer embeddedElasticsearchServer; + private static JestClient client; + private static UpdateableDataContext dataContext; + + @BeforeClass + public static void beforeTests() throws Exception { + embeddedElasticsearchServer = new EmbeddedElasticsearchServer(); + final int port = Integer.parseInt(embeddedElasticsearchServer.getClient().settings().get("http.port")); + JestClientFactory factory = new JestClientFactory(); + factory.setHttpClientConfig(new HttpClientConfig + .Builder("http://localhost:" + port) + .multiThreaded(true) + .build()); + client = factory.getObject(); + + indexTweeterDocument(indexType1, 1); + indexTweeterDocument(indexType2, 1); + indexTweeterDocument(indexType2, 2, null); + insertPeopleDocuments(); + indexTweeterDocument(indexType2, 1); + indexBulkDocuments(indexName, bulkIndexType, 10); + + // The refresh API allows to explicitly refresh one or more index, + // making all operations performed since the last refresh available for + // search + dataContext = new ElasticSearchRestDataContext(client, indexName); + Thread.sleep(1000); + System.out.println("Embedded ElasticSearch server created!"); + } + + private static void insertPeopleDocuments() throws IOException { + indexOnePeopleDocument("female", 20, 5); + indexOnePeopleDocument("female", 17, 8); + indexOnePeopleDocument("female", 18, 9); + indexOnePeopleDocument("female", 19, 10); + indexOnePeopleDocument("female", 20, 11); + indexOnePeopleDocument("male", 19, 1); + indexOnePeopleDocument("male", 17, 2); + indexOnePeopleDocument("male", 18, 3); + indexOnePeopleDocument("male", 18, 4); + } + + @AfterClass + public static void afterTests() { + embeddedElasticsearchServer.shutdown(); + System.out.println("Embedded ElasticSearch server shut down!"); + } + + @Test + public void testSimpleQuery() throws Exception { + assertEquals("[bulktype, peopletype, tweet1, tweet2]", + Arrays.toString(dataContext.getDefaultSchema().getTableNames())); + + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + + assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames())); + + assertEquals(ColumnType.STRING, table.getColumnByName("user").getType()); + assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType()); + assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType()); + + try (DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) { + assertEquals(JestElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user1, 1]]", ds.getRow().toString()); + } + } + + @Test + public void testDocumentIdAsPrimaryKey() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys(); + assertEquals(1, pks.length); + assertEquals("_id", pks[0].getName()); + + try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) { + assertTrue(ds.next()); + assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString()); + } + } + + @Test + public void testExecutePrimaryKeyLookupQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet2"); + Column[] pks = table.getPrimaryKeys(); + + try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) { + assertTrue(ds.next()); + Object dateValue = ds.getRow().getValue(2); + assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", user1]]", ds.getRow().toString()); + + assertFalse(ds.next()); + + assertEquals(InMemoryDataSet.class, ds.getClass()); + } + } + + @Test + public void testDateIsHandledAsDate() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName("tweet1"); + Column column = table.getColumnByName("postDate"); + ColumnType type = column.getType(); + assertEquals(ColumnType.DATE, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Date (or subclass)", value instanceof Date); + } + } + + @Test + public void testNumberIsHandledAsNumber() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Column column = table.getColumnByName("age"); + ColumnType type = column.getType(); + assertEquals(ColumnType.BIGINT, type); + + DataSet dataSet = dataContext.query().from(table).select(column).execute(); + while (dataSet.next()) { + Object value = dataSet.getRow().getValue(column); + assertTrue("Got class: " + value.getClass() + ", expected Number (or subclass)", value instanceof Number); + } + } + + @Test + public void testCreateTableInsertQueryAndDrop() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + assertNotNull(table); + assertEquals("[" + ElasticSearchRestDataContext.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames())); + + final Column fooColumn = table.getColumnByName("foo"); + final Column idColumn = table.getPrimaryKeys()[0]; + assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]", + idColumn.toString()); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.refreshSchemas(); + + + try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) { + assertTrue(ds.next()); + assertEquals("hello", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertTrue(ds.next()); + assertEquals("world", ds.getRow().getValue(fooColumn).toString()); + assertNotNull(ds.getRow().getValue(idColumn)); + assertFalse(ds.next()); + } + + dataContext.executeUpdate(new DropTable(table)); + + dataContext.refreshSchemas(); + + assertNull(dataContext.getTableByQualifiedLabel(table.getName())); + } + + @Test + public void testDetectOutsideChanges() throws Exception { + // Create the type in ES + final IndicesAdminClient indicesAdmin = embeddedElasticsearchServer.getClient().admin().indices(); + final String tableType = "outsideTable"; + + Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" }; + + new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties) + .execute().actionGet(); + + dataContext.refreshSchemas(); + + assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType)); + + new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet(); + dataContext.refreshSchemas(); + assertNull(dataContext.getTableByQualifiedLabel(tableType)); + } + + @Test + public void testDeleteAll() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount() + .toQuery()); + assertEquals("Count is wrong", 0, ((Number) row.getValue(0)).intValue()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteByQuery() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42)); + + Row row = MetaModelHelper.executeSingleRowQuery(dataContext, + dataContext.query().from(table).select("foo", "bar").toQuery()); + assertEquals("Row[values=[world, 43]]", row.toString()); + + dataContext.executeUpdate(new DropTable(table)); + } + + @Test + public void testDeleteUnsupportedQueryType() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + // greater than is not yet supported + try { + dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40)); + fail("Exception expected"); + } catch (UnsupportedOperationException e) { + assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", + e.getMessage()); + } + + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test + public void testUpdateRow() throws Exception { + final Schema schema = dataContext.getDefaultSchema(); + final CreateTable createTable = new CreateTable(schema, "testCreateTable"); + createTable.withColumn("foo").ofType(ColumnType.STRING); + createTable.withColumn("bar").ofType(ColumnType.NUMBER); + dataContext.executeUpdate(createTable); + + final Table table = schema.getTableByName("testCreateTable"); + try { + + dataContext.executeUpdate(new UpdateScript() { + @Override + public void run(UpdateCallback callback) { + callback.insertInto(table).value("foo", "hello").value("bar", 42).execute(); + callback.insertInto(table).value("foo", "world").value("bar", 43).execute(); + } + }); + + dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42)); + + DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute(); + assertTrue(dataSet.next()); + assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString()); + assertTrue(dataSet.next()); + assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString()); + assertFalse(dataSet.next()); + dataSet.close(); + } finally { + dataContext.executeUpdate(new DropTable(table)); + } + } + + @Test + public void testDropTable() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + + // assert that the table was there to begin with + { + DataSet ds = dataContext.query().from(table).selectCount().execute(); + ds.next(); + assertEquals("Count is wrong", 9, ((Number) ds.getRow().getValue(0)).intValue()); + ds.close(); + } + + dataContext.executeUpdate(new DropTable(table)); + try { + DataSet ds = dataContext.query().from(table).selectCount().execute(); + ds.next(); + assertEquals("Count is wrong", 0, ((Number) ds.getRow().getValue(0)).intValue()); + ds.close(); + } finally { + // restore the people documents for the next tests + insertPeopleDocuments(); + embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet(); + dataContext = new ElasticSearchRestDataContext(client, indexName); + } + } + + @Test + public void testWhereColumnEqualsValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").execute()) { + assertEquals(JestElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNullValues() throws Exception { + try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNull().execute()) { + assertEquals(JestElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[2]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnIsNotNullValues() throws Exception { + try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate") + .isNotNull().execute()) { + assertEquals(JestElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[1]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereMultiColumnsEqualValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .isEquals("user4").and("message").ne(5).execute()) { + assertEquals(JestElasticSearchDataSet.class, ds.getClass()); + + assertTrue(ds.next()); + assertEquals("Row[values=[user4, 4]]", ds.getRow().toString()); + assertFalse(ds.next()); + } + } + + @Test + public void testWhereColumnInValues() throws Exception { + try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user") + .in("user4", "user5").orderBy("message").execute()) { + assertTrue(ds.next()); + + String row1 = ds.getRow().toString(); + assertEquals("Row[values=[user4, 4]]", row1); + assertTrue(ds.next()); + + String row2 = ds.getRow().toString(); + assertEquals("Row[values=[user5, 5]]", row2); + + assertFalse(ds.next()); + } + } + + @Test + public void testGroupByQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + + Query q = new Query(); + q.from(table); + q.groupBy(table.getColumnByName("gender")); + q.select(new SelectItem(table.getColumnByName("gender")), + new SelectItem(FunctionType.MAX, table.getColumnByName("age")), + new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*", + "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId")); + q.orderBy("gender"); + DataSet data = dataContext.executeQuery(q); + assertEquals( + "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]", + Arrays.toString(data.getSelectItems())); + + assertTrue(data.next()); + assertEquals("Row[values=[female, 20, 17, 5, 5]]", data.getRow().toString()); + assertTrue(data.next()); + assertEquals("Row[values=[male, 19, 17, 4, 1]]", data.getRow().toString()); + assertFalse(data.next()); + } + + @Test + public void testFilterOnNumberColumn() { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery(); + DataSet data = dataContext.executeQuery(q); + String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" }; + + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertTrue(data.next()); + assertTrue(Arrays.asList(expectations).contains(data.getRow().toString())); + assertFalse(data.next()); + } + + @Test + public void testMaxRows() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType); + Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5); + DataSet dataSet = dataContext.executeQuery(query); + + TableModel tableModel = new DataSetTableModel(dataSet); + assertEquals(5, tableModel.getRowCount()); + } + + @Test + public void testCountQuery() throws Exception { + Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType); + Query q = new Query().selectCount().from(table); + + List<Object[]> data = dataContext.executeQuery(q).toObjectArrays(); + assertEquals(1, data.size()); + Object[] row = data.get(0); + assertEquals(1, row.length); + assertEquals(10, ((Number) row[0]).intValue()); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueryForANonExistingTable() throws Exception { + dataContext.query().from("nonExistingTable").select("user").and("message").execute(); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueryForAnExistingTableAndNonExistingField() throws Exception { + indexTweeterDocument(indexType1, 1); + dataContext.query().from(indexType1).select("nonExistingField").execute(); + } + + @Test + public void testNonDynamicMapingTableNames() throws Exception { + createIndex(); + + ElasticSearchRestDataContext dataContext2 = new ElasticSearchRestDataContext(client, indexName2); + + assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames())); + } + + private static void createIndex() { + CreateIndexRequest cir = new CreateIndexRequest(indexName2); + CreateIndexResponse response = + embeddedElasticsearchServer.getClient().admin().indices().create(cir).actionGet(); + + System.out.println("create index: " + response.isAcknowledged()); + + PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping); + + PutMappingResponse response2 = + embeddedElasticsearchServer.getClient().admin().indices().putMapping(pmr).actionGet(); + System.out.println("put mapping: " + response2.isAcknowledged()); + } + + private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) { + BulkRequestBuilder bulkRequest = embeddedElasticsearchServer.getClient().prepareBulk(); + + for (int i = 0; i < numberOfDocuments; i++) { + bulkRequest.add(embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType, + Integer.toString(i)).setSource( + buildTweeterJson(i))); + } + bulkRequest.execute().actionGet(); + } + + private static void indexTweeterDocument(String indexType, int id, Date date) { + embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); + } + + private static void indexTweeterDocument(String indexType, int id) { + embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)) + .setId("tweet_" + indexType + "_" + id).execute().actionGet(); + } + + private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException { + embeddedElasticsearchServer.getClient().prepareIndex(indexName, peopleIndexType) + .setSource(buildPeopleJson(gender, age, id)).execute() + .actionGet(); + } + + private static Map<String, Object> buildTweeterJson(int elementId) { + return buildTweeterJson(elementId, new Date()); + } + + private static Map<String, Object> buildTweeterJson(int elementId, Date date) { + Map<String, Object> map = new LinkedHashMap<>(); + map.put("user", "user" + elementId); + map.put("postDate", date); + map.put("message", elementId); + return map; + } + + private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException { + return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject(); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java new file mode 100644 index 0000000..6eeac6a --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import java.util.LinkedHashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData; +import org.apache.metamodel.schema.ColumnType; +import org.elasticsearch.common.collect.MapBuilder; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +public class JestElasticSearchMetaDataParserTest extends TestCase { + + public void testParseMetadataInfo() throws Exception { + Map<String, Object> metadata = new LinkedHashMap<>(); + metadata.put("message", MapBuilder.newMapBuilder().put("type", "long").immutableMap()); + metadata.put("postDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap()); + metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap()); + metadata.put("user", MapBuilder.newMapBuilder().put("type", "string").immutableMap()); + metadata.put("critical", MapBuilder.newMapBuilder().put("type", "boolean").immutableMap()); + metadata.put("income", MapBuilder.newMapBuilder().put("type", "double").immutableMap()); + metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", "bar").immutableMap()); + final Gson gson = new Gson(); + ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser + .parse((JsonObject) gson.toJsonTree(metadata)); + String[] columnNames = metaData.getColumnNames(); + ColumnType[] columnTypes = metaData.getColumnTypes(); + + assertTrue(columnNames.length == 8); + assertEquals(columnNames[0], "_id"); + assertEquals(columnNames[1], "message"); + assertEquals(columnNames[2], "postDate"); + assertEquals(columnNames[3], "anotherDate"); + assertEquals(columnNames[4], "user"); + assertEquals(columnNames[5], "critical"); + assertEquals(columnNames[6], "income"); + assertEquals(columnNames[7], "untypedthingie"); + + assertTrue(columnTypes.length == 8); + assertEquals(columnTypes[0], ColumnType.STRING); + assertEquals(columnTypes[1], ColumnType.BIGINT); + assertEquals(columnTypes[2], ColumnType.DATE); + assertEquals(columnTypes[3], ColumnType.DATE); + assertEquals(columnTypes[4], ColumnType.STRING); + assertEquals(columnTypes[5], ColumnType.BOOLEAN); + assertEquals(columnTypes[6], ColumnType.DOUBLE); + assertEquals(columnTypes[7], ColumnType.STRING); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java new file mode 100644 index 0000000..0d78d8e --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest; + +import com.google.gson.JsonObject; +import junit.framework.TestCase; +import org.apache.metamodel.data.DataSetHeader; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.data.SimpleDataSetHeader; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.MutableColumn; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +public class JestElasticSearchUtilsTest extends TestCase { + + public void testAssignDocumentIdForPrimaryKeys() throws Exception { + MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true); + SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn); + List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + JsonObject values = new JsonObject(); + + values.addProperty("value1", "theValue"); + Row row = JestElasticSearchUtils.createRow(values, documentId, header); + String primaryKeyValue = (String) row.getValue(primaryKeyItem); + + assertEquals(primaryKeyValue, documentId); + } + + public void testCreateRowWithParseableDates() throws Exception { + SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING)); + SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE)); + List<SelectItem> selectItems1 = Arrays.asList(item1, item2); + String documentId = "doc1"; + DataSetHeader header = new SimpleDataSetHeader(selectItems1); + JsonObject values = new JsonObject(); + values.addProperty("value1", "theValue"); + values.addProperty("value2", "2013-01-04T15:55:51.217+01:00"); + Row row = JestElasticSearchUtils.createRow(values, documentId, header); + Object stringValue = row.getValue(item1); + Object dateValue = row.getValue(item2); + + assertTrue(stringValue instanceof String); + assertTrue(dateValue instanceof Date); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java ---------------------------------------------------------------------- diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java new file mode 100644 index 0000000..11e7eb5 --- /dev/null +++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.elasticsearch.rest.utils; + +import org.apache.commons.io.FileUtils; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.node.Node; + +import java.io.File; +import java.io.IOException; + +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +public class EmbeddedElasticsearchServer { + + private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data"; + + private final Node node; + private final String dataDirectory; + + public EmbeddedElasticsearchServer() { + this(DEFAULT_DATA_DIRECTORY); + } + + public EmbeddedElasticsearchServer(String dataDirectory) { + this.dataDirectory = dataDirectory; + + ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder() + .put("http.enabled", "true") + .put("path.data", dataDirectory) + .put("http.port", 9292); + + node = nodeBuilder() + .local(true) + .settings(elasticsearchSettings.build()) + .node(); + } + + public Client getClient() { + return node.client(); + } + + public void shutdown() { + node.close(); + deleteDataDirectory(); + } + + private void deleteDataDirectory() { + try { + FileUtils.deleteDirectory(new File(dataDirectory)); + } catch (IOException e) { + throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java deleted file mode 100644 index 0a1750d..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchCreateTableBuilder.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.create.AbstractTableCreationBuilder; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.ColumnType; -import org.apache.metamodel.schema.MutableColumn; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.MutableTable; -import org.apache.metamodel.schema.Schema; -import org.apache.metamodel.schema.Table; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.common.base.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchUpdateCallback> { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchCreateTableBuilder.class); - - public ElasticSearchCreateTableBuilder(ElasticSearchUpdateCallback updateCallback, Schema schema, String name) { - super(updateCallback, schema, name); - } - - @Override - public Table execute() throws MetaModelException { - final MutableTable table = getTable(); - - if (table.getColumnByName(ElasticSearchDataContext.FIELD_ID) == null) { - final MutableColumn idColumn = new MutableColumn(ElasticSearchDataContext.FIELD_ID, ColumnType.STRING) - .setTable(table).setPrimaryKey(true); - table.addColumn(0, idColumn); - } - - final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext(); - final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices(); - final String indexName = dataContext.getIndexName(); - - final List<Object> sourceProperties = new ArrayList<>(); - for (Column column : table.getColumns()) { - // each column is defined as a property pair of the form: ("field1", - // "type=string,store=true") - final String columnName = column.getName(); - if (ElasticSearchDataContext.FIELD_ID.equals(columnName)) { - // do nothing - the ID is a client-side construct - continue; - } - sourceProperties.add(columnName); - - String type = getType(column); - if (type == null) { - sourceProperties.add("store=true"); - } else { - sourceProperties.add("type=" + type + ",store=true"); - } - } - - final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin) - .setIndices(indexName).setType(table.getName()); - requestBuilder.setSource(sourceProperties.toArray()); - final PutMappingResponse result = requestBuilder.execute().actionGet(); - - logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged()); - - final MutableSchema schema = (MutableSchema) getSchema(); - schema.addTable(table); - return table; - } - - /** - * Determines the best fitting type. For reference of ElasticSearch types, - * see - * - * <pre> - * http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-core-types.html - * </pre> - * - * - * @param column - * @return - */ - private String getType(Column column) { - String nativeType = column.getNativeType(); - if (!Strings.isNullOrEmpty(nativeType)) { - return nativeType; - } - - final ColumnType type = column.getType(); - if (type == null) { - throw new IllegalStateException("No column type specified for '" + column.getName() - + "' - cannot build ElasticSearch mapping without type."); - } - - if (type.isLiteral()) { - return "string"; - } else if (type == ColumnType.FLOAT) { - return "float"; - } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) { - return "double"; - } else if (type == ColumnType.SMALLINT) { - return "short"; - } else if (type == ColumnType.TINYINT) { - return "byte"; - } else if (type == ColumnType.INTEGER) { - return "integer"; - } else if (type == ColumnType.DATE || type == ColumnType.TIMESTAMP) { - return "date"; - } else if (type == ColumnType.BINARY || type == ColumnType.VARBINARY) { - return "binary"; - } else if (type == ColumnType.BOOLEAN || type == ColumnType.BIT) { - return "boolean"; - } else if (type == ColumnType.MAP) { - return "object"; - } - - throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '" - + column.getName() + "' - cannot translate to an ElasticSearch type."); - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java deleted file mode 100644 index 34bb983..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java +++ /dev/null @@ -1,471 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; - -import org.apache.metamodel.DataContext; -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.QueryPostprocessDataContext; -import org.apache.metamodel.UpdateScript; -import org.apache.metamodel.UpdateableDataContext; -import org.apache.metamodel.data.DataSet; -import org.apache.metamodel.data.DataSetHeader; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.data.SimpleDataSetHeader; -import org.apache.metamodel.query.FilterItem; -import org.apache.metamodel.query.LogicalOperator; -import org.apache.metamodel.query.OperatorType; -import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.Column; -import org.apache.metamodel.schema.MutableColumn; -import org.apache.metamodel.schema.MutableSchema; -import org.apache.metamodel.schema.MutableTable; -import org.apache.metamodel.schema.Schema; -import org.apache.metamodel.schema.Table; -import org.apache.metamodel.util.CollectionUtils; -import org.apache.metamodel.util.SimpleTableDef; -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; -import org.elasticsearch.action.count.CountResponse; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.hppc.ObjectLookupContainer; -import org.elasticsearch.common.hppc.cursors.ObjectCursor; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.FilterBuilders; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * DataContext implementation for ElasticSearch analytics engine. - * - * ElasticSearch has a data storage structure hierarchy that briefly goes like - * this: - * <ul> - * <li>Index</li> - * <li>Document type (short: Type) (within an index)</li> - * <li>Documents (of a particular type)</li> - * </ul> - * - * When instantiating this DataContext, an index name is provided. Within this - * index, each document type is represented as a table. - * - * This implementation supports either automatic discovery of a schema or manual - * specification of a schema, through the {@link SimpleTableDef} class. - */ -public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class); - - public static final String FIELD_ID = "_id"; - - public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60); - - private final Client elasticSearchClient; - private final String indexName; - // Table definitions that are set from the beginning, not supposed to be changed. - private final List<SimpleTableDef> staticTableDefinitions; - - // Table definitions that are discovered, these can change - private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>(); - - /** - * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a - * custom array of {@link SimpleTableDef}s which allows the user to define - * his own view on the indexes in the engine. - * - * @param client - * the ElasticSearch client - * @param indexName - * the name of the ElasticSearch index to represent - * @param tableDefinitions - * an array of {@link SimpleTableDef}s, which define the table - * and column model of the ElasticSearch index. - */ - public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) { - if (client == null) { - throw new IllegalArgumentException("ElasticSearch Client cannot be null"); - } - if (indexName == null || indexName.trim().length() == 0) { - throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName); - } - this.elasticSearchClient = client; - this.indexName = indexName; - this.staticTableDefinitions = Arrays.asList(tableDefinitions); - this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema())); - } - - /** - * Constructs a {@link ElasticSearchDataContext} and automatically detects - * the schema structure/view on all indexes (see - * {@link this.detectSchema(Client, String)}). - * - * @param client - * the ElasticSearch client - * @param indexName - * the name of the ElasticSearch index to represent - */ - public ElasticSearchDataContext(Client client, String indexName) { - this(client, indexName, new SimpleTableDef[0]); - } - - /** - * Performs an analysis of the available indexes in an ElasticSearch cluster - * {@link Client} instance and detects the elasticsearch types structure - * based on the metadata provided by the ElasticSearch java client. - * - * @see #detectTable(ClusterState, String, String) - * @return a mutable schema instance, useful for further fine tuning by the - * user. - */ - private SimpleTableDef[] detectSchema() { - logger.info("Detecting schema for index '{}'", indexName); - - final ClusterState cs; - final ClusterStateRequestBuilder clusterStateRequestBuilder = - getElasticSearchClient().admin().cluster().prepareState(); - - // different methods here to set the index name, so we have to use - // reflection :-/ - try { - final byte majorVersion = Version.CURRENT.major; - final Object methodArgument = new String[] { indexName }; - if (majorVersion == 0) { - final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class); - method.invoke(clusterStateRequestBuilder, methodArgument); - } else { - final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class); - method.invoke(clusterStateRequestBuilder, methodArgument); - } - } catch (Exception e) { - logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e); - throw new MetaModelException("Failed to create request for index information needed to detect schema", e); - } - cs = clusterStateRequestBuilder.execute().actionGet().getState(); - - final List<SimpleTableDef> result = new ArrayList<>(); - - final IndexMetaData imd = cs.getMetaData().index(indexName); - if (imd == null) { - // index does not exist - logger.warn("No metadata returned for index name '{}' - no tables will be detected."); - } else { - final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings(); - final ObjectLookupContainer<String> documentTypes = mappings.keys(); - - for (final Object documentTypeCursor : documentTypes) { - final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString(); - try { - final SimpleTableDef table = detectTable(cs, indexName, documentType); - result.add(table); - } catch (Exception e) { - logger.error("Unexpected error during detectTable for document type '{}'", documentType, e); - } - } - } - final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]); - Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() { - @Override - public int compare(SimpleTableDef o1, SimpleTableDef o2) { - return o1.getName().compareTo(o2.getName()); - } - }); - - return tableDefArray; - } - - /** - * Performs an analysis of an available index type in an ElasticSearch - * {@link Client} client and tries to detect the index structure based on - * the metadata provided by the java client. - * - * @param cs - * the ElasticSearch cluster - * @param indexName - * the name of the index - * @param documentType - * the name of the index type - * @return a table definition for ElasticSearch. - */ - public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception { - logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName); - final IndexMetaData imd = cs.getMetaData().index(indexName); - if (imd == null) { - // index does not exist - throw new IllegalArgumentException("No such index: " + indexName); - } - final MappingMetaData mappingMetaData = imd.mapping(documentType); - if (mappingMetaData == null) { - throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType); - } - final Map<String, Object> mp = mappingMetaData.getSourceAsMap(); - final Object metadataProperties = mp.get("properties"); - if (metadataProperties != null && metadataProperties instanceof Map) { - @SuppressWarnings("unchecked") - final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties; - final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap); - final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(), - metaData.getColumnTypes()); - return std; - } - throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType - + "' in index: " + indexName); - } - - @Override - protected Schema getMainSchema() throws MetaModelException { - final MutableSchema theSchema = new MutableSchema(getMainSchemaName()); - for (final SimpleTableDef tableDef : staticTableDefinitions) { - addTable(theSchema, tableDef); - } - - final SimpleTableDef[] tables = detectSchema(); - synchronized (this) { - dynamicTableDefinitions.clear(); - dynamicTableDefinitions.addAll(Arrays.asList(tables)); - for (final SimpleTableDef tableDef : dynamicTableDefinitions) { - final List<String> tableNames = Arrays.asList(theSchema.getTableNames()); - - if (!tableNames.contains(tableDef.getName())) { - addTable(theSchema, tableDef); - } - } - } - - return theSchema; - } - - private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) { - final MutableTable table = tableDef.toTable().setSchema(theSchema); - final Column idColumn = table.getColumnByName(FIELD_ID); - if (idColumn != null && idColumn instanceof MutableColumn) { - final MutableColumn mutableColumn = (MutableColumn) idColumn; - mutableColumn.setPrimaryKey(true); - } - theSchema.addTable(table); - } - - @Override - protected String getMainSchemaName() throws MetaModelException { - return indexName; - } - - @Override - protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, - List<FilterItem> whereItems, int firstRow, int maxRows) { - final QueryBuilder queryBuilder = createQueryBuilderForSimpleWhere(table, whereItems, LogicalOperator.AND); - if (queryBuilder != null) { - // where clause can be pushed down to an ElasticSearch query - final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder); - final SearchResponse response = searchRequest.execute().actionGet(); - return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false); - } - return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); - } - - @Override - protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) { - final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null); - final SearchResponse response = searchRequest.execute().actionGet(); - return new ElasticSearchDataSet(elasticSearchClient, response, columns, false); - } - - private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) { - final String documentType = table.getName(); - final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType); - if (firstRow > 1) { - final int zeroBasedFrom = firstRow - 1; - searchRequest.setFrom(zeroBasedFrom); - } - if (limitMaxRowsIsSet(maxRows)) { - searchRequest.setSize(maxRows); - } else { - searchRequest.setScroll(TIMEOUT_SCROLL); - } - - if (queryBuilder != null) { - searchRequest.setQuery(queryBuilder); - } - - return searchRequest; - } - - /** - * Creates, if possible, a {@link QueryBuilder} object which can be used to - * push down one or more {@link FilterItem}s to ElasticSearch's backend. - * - * @param table - * @param whereItems - * @param logicalOperator - * @return a {@link QueryBuilder} if one was produced, or null if the items - * could not be pushed down to an ElasticSearch query - */ - protected QueryBuilder createQueryBuilderForSimpleWhere(Table table, List<FilterItem> whereItems, - LogicalOperator logicalOperator) { - if (whereItems.isEmpty()) { - return QueryBuilders.matchAllQuery(); - } - - List<QueryBuilder> children = new ArrayList<>(whereItems.size()); - for (FilterItem item : whereItems) { - final QueryBuilder itemQueryBuilder; - - if (item.isCompoundFilter()) { - final List<FilterItem> childItems = Arrays.asList(item.getChildItems()); - itemQueryBuilder = createQueryBuilderForSimpleWhere(table, childItems, item.getLogicalOperator()); - if (itemQueryBuilder == null) { - // something was not supported, so we have to forfeit here - // too. - return null; - } - } else { - final Column column = item.getSelectItem().getColumn(); - if (column == null) { - // unsupport type of where item - must have a column - // reference - return null; - } - final String fieldName = column.getName(); - final Object operand = item.getOperand(); - final OperatorType operator = item.getOperator(); - - if (OperatorType.EQUALS_TO.equals(operator)) { - if (operand == null) { - itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName)); - } else { - itemQueryBuilder = QueryBuilders.termQuery(fieldName, operand); - } - } else if (OperatorType.DIFFERENT_FROM.equals(operator)) { - if (operand == null) { - itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(fieldName)); - } else { - itemQueryBuilder = QueryBuilders.boolQuery().mustNot( - QueryBuilders.termQuery(fieldName, operand)); - } - } else if (OperatorType.IN.equals(operator)) { - final List<?> operands = CollectionUtils.toList(operand); - itemQueryBuilder = QueryBuilders.termsQuery(fieldName, operands); - } else { - // not (yet) support operator types - return null; - } - } - - children.add(itemQueryBuilder); - } - - // just one where item - just return the child query builder - if (children.size() == 1) { - return children.get(0); - } - - // build a bool query - final BoolQueryBuilder result = QueryBuilders.boolQuery(); - for (QueryBuilder child : children) { - switch (logicalOperator) { - case AND: - result.must(child); - case OR: - result.should(child); - } - } - - return result; - } - - @Override - protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn, - Object keyValue) { - if (keyValue == null) { - return null; - } - - final String documentType = table.getName(); - final String id = keyValue.toString(); - - final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet(); - - if (!response.isExists()) { - return null; - } - - final Map<String, Object> source = response.getSource(); - final String documentId = response.getId(); - - final DataSetHeader header = new SimpleDataSetHeader(selectItems); - - return ElasticSearchUtils.createRow(source, documentId, header); - } - - @Override - protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) { - if (!whereItems.isEmpty()) { - // not supported - will have to be done by counting client-side - return null; - } - final String documentType = table.getName(); - final CountResponse response = elasticSearchClient.prepareCount(indexName) - .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet(); - return response.getCount(); - } - - private boolean limitMaxRowsIsSet(int maxRows) { - return (maxRows != -1); - } - - @Override - public void executeUpdate(UpdateScript update) { - final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this); - update.run(callback); - callback.onExecuteUpdateFinished(); - } - - /** - * Gets the {@link Client} that this {@link DataContext} is wrapping. - * - * @return - */ - public Client getElasticSearchClient() { - return elasticSearchClient; - } - - /** - * Gets the name of the index that this {@link DataContext} is working on. - * - * @return - */ - public String getIndexName() { - return indexName; - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java deleted file mode 100644 index 8c41524..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.metamodel.data.AbstractDataSet; -import org.apache.metamodel.data.DataSet; -import org.apache.metamodel.data.Row; -import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.Column; -import org.elasticsearch.action.search.ClearScrollRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.search.SearchHit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link DataSet} implementation for ElasticSearch - */ -final class ElasticSearchDataSet extends AbstractDataSet { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class); - - private final Client _client; - private final AtomicBoolean _closed; - - private SearchResponse _searchResponse; - private SearchHit _currentHit; - private int _hitIndex = 0; - - public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems, - boolean queryPostProcessed) { - super(selectItems); - _client = client; - _searchResponse = searchResponse; - _closed = new AtomicBoolean(false); - } - - public ElasticSearchDataSet(Client client, SearchResponse searchResponse, Column[] columns, - boolean queryPostProcessed) { - super(columns); - _client = client; - _searchResponse = searchResponse; - _closed = new AtomicBoolean(false); - } - - @Override - public void close() { - super.close(); - boolean closeNow = _closed.compareAndSet(true, false); - if (closeNow) { - ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client) - .addScrollId(_searchResponse.getScrollId()); - scrollRequestBuilder.execute(); - } - } - - @Override - protected void finalize() throws Throwable { - super.finalize(); - if (!_closed.get()) { - logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this); - close(); - } - } - - @Override - public boolean next() { - final SearchHit[] hits = _searchResponse.getHits().hits(); - if (hits.length == 0) { - // break condition for the scroll - _currentHit = null; - return false; - } - - if (_hitIndex < hits.length) { - // pick the next hit within this search response - _currentHit = hits[_hitIndex]; - _hitIndex++; - return true; - } - - final String scrollId = _searchResponse.getScrollId(); - if (scrollId == null) { - // this search response is not scrolleable - then it's the end. - _currentHit = null; - return false; - } - - // try to scroll to the next set of hits - _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL) - .execute().actionGet(); - - // start over (recursively) - _hitIndex = 0; - return next(); - } - - @Override - public Row getRow() { - if (_currentHit == null) { - return null; - } - - final Map<String, Object> source = _currentHit.getSource(); - final String documentId = _currentHit.getId(); - final Row row = ElasticSearchUtils.createRow(source, documentId, getHeader()); - return row; - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java deleted file mode 100644 index fcf0b24..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDateConverter.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import org.apache.metamodel.util.TimeComparator; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * Util class to convert date strings from ElasticSearch to - * proper java Dates. - */ -final class ElasticSearchDateConverter { - - public static Date tryToConvert(String dateAsString) { - try { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); - return dateFormat.parse(dateAsString); - } catch (ParseException e) { - return TimeComparator.toDate(dateAsString); - } - } -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/137caf0d/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java deleted file mode 100644 index b1fdf6e..0000000 --- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDeleteBuilder.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.elasticsearch; - -import java.util.List; - -import org.apache.metamodel.MetaModelException; -import org.apache.metamodel.delete.AbstractRowDeletionBuilder; -import org.apache.metamodel.delete.RowDeletionBuilder; -import org.apache.metamodel.query.FilterItem; -import org.apache.metamodel.query.LogicalOperator; -import org.apache.metamodel.schema.Table; -import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; -import org.elasticsearch.client.Client; -import org.elasticsearch.index.query.QueryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link RowDeletionBuilder} implementation for - * {@link ElasticSearchDataContext}. - */ -final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder { - - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDeleteBuilder.class); - - private final ElasticSearchUpdateCallback _updateCallback; - - public ElasticSearchDeleteBuilder(ElasticSearchUpdateCallback updateCallback, Table table) { - super(table); - _updateCallback = updateCallback; - } - - @Override - public void execute() throws MetaModelException { - final Table table = getTable(); - final String documentType = table.getName(); - - final ElasticSearchDataContext dataContext = _updateCallback.getDataContext(); - final Client client = dataContext.getElasticSearchClient(); - final String indexName = dataContext.getIndexName(); - - final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client); - deleteByQueryRequestBuilder.setIndices(indexName); - deleteByQueryRequestBuilder.setTypes(documentType); - - final List<FilterItem> whereItems = getWhereItems(); - - // delete by query - note that creteQueryBuilderForSimpleWhere may - // return matchAllQuery() if no where items are present. - final QueryBuilder queryBuilder = dataContext.createQueryBuilderForSimpleWhere(table, whereItems, - LogicalOperator.AND); - if (queryBuilder == null) { - // TODO: The where items could not be pushed down to a query. We - // could solve this by running a query first, gather all - // document IDs and then delete by IDs. - throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: " - + whereItems); - } - deleteByQueryRequestBuilder.setQuery(queryBuilder); - deleteByQueryRequestBuilder.execute().actionGet(); - - logger.debug("Deleted documents by query."); - } -}