Thank you Julian, for correction. I wasn't sure if I should remove it since the original commit (and PR) was created when I wasn't yet a committer. Going forward I will not include my name for personal commits.
On Wed, Sep 19, 2018 at 10:34 AM Julian Hyde <jhyde.apa...@gmail.com> wrote: > Thanks Andrei. One thing: now you’re a committer your commit messages > should not (must not) end with “(Andrei Sereda)”. > > Julian > > > On Sep 18, 2018, at 7:57 PM, ser...@apache.org wrote: > > > > [CALCITE-2528] Support Aggregates in ElasticSearch adapter (Andrei > Sereda) > > > > Aggregate functions (count/sum/min/max/avg) are pushed down to ES. > > > > Add ElasticsearchAggregate relational expression to convert SQL into > native Elastic aggregations (value_count, min, max etc.). > > Enhance ElasticsearchTable to prepare correct aggregate ES JSON query. > > > > Create special classes to parse recursively elastic aggregation response > or buckets (located in ElasticJson). They're inspired from existing Elastic > high-level client source. > > > > For tests, make Json input more human friendly. Single quotes are > accepted and fields can be unquoted (unless > > they contain special characters). Also field with dots 'a.b.c' are > automatically auto-expanded. This reduces JSON noise. > > > > Fix single projections which previously returned map (see [CALCITE-2485]) > > > > Close apache/calcite#801 > > Close apache/calcite#822 > > > > > > Project: http://git-wip-us.apache.org/repos/asf/calcite/repo > > Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/79af1c9b > > Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/79af1c9b > > Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/79af1c9b > > > > Branch: refs/heads/master > > Commit: 79af1c9ba735286653697deed3ff849b7c921fe4 > > Parents: ce05146 > > Author: Andrei Sereda <25229979+asereda...@users.noreply.github.com> > > Authored: Tue Sep 18 22:53:24 2018 -0400 > > Committer: Andrei Sereda <25229979+asereda...@users.noreply.github.com> > > Committed: Tue Sep 18 22:53:24 2018 -0400 > > > > ---------------------------------------------------------------------- > > elasticsearch/pom.xml | 6 + > > .../AbstractElasticsearchTable.java | 150 ----- > > .../elasticsearch/ElasticsearchAggregate.java | 165 +++++ > > .../elasticsearch/ElasticsearchConstants.java | 9 - > > .../elasticsearch/ElasticsearchEnumerators.java | 44 +- > > .../elasticsearch/ElasticsearchFilter.java | 17 +- > > .../elasticsearch/ElasticsearchJson.java | 614 +++++++++++++++++++ > > .../elasticsearch/ElasticsearchMethod.java | 13 +- > > .../elasticsearch/ElasticsearchProject.java | 6 +- > > .../adapter/elasticsearch/ElasticsearchRel.java | 66 +- > > .../elasticsearch/ElasticsearchRules.java | 38 +- > > .../elasticsearch/ElasticsearchSchema.java | 30 +- > > .../elasticsearch/ElasticsearchSort.java | 41 +- > > .../elasticsearch/ElasticsearchTable.java | 313 +++++++++- > > .../elasticsearch/ElasticsearchTableScan.java | 6 +- > > .../ElasticsearchToEnumerableConverter.java | 46 +- > > .../elasticsearch/PredicateAnalyzer.java | 10 + > > .../adapter/elasticsearch/QueryBuilders.java | 106 +++- > > .../adapter/elasticsearch/AggregationTest.java | 235 +++++++ > > .../adapter/elasticsearch/BooleanLogicTest.java | 1 + > > .../elasticsearch/ElasticSearchAdapterTest.java | 309 +++++++--- > > .../elasticsearch/ElasticsearchJsonTest.java | 183 ++++++ > > .../EmbeddedElasticsearchPolicy.java | 41 +- > > .../adapter/elasticsearch/Projection2Test.java | 107 ++++ > > .../adapter/elasticsearch/ProjectionTest.java | 37 +- > > .../elasticsearch/QueryBuildersTest.java | 63 ++ > > .../calcite/test/ElasticsearchChecker.java | 90 ++- > > 27 files changed, 2340 insertions(+), 406 deletions(-) > > ---------------------------------------------------------------------- > > > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/pom.xml > > ---------------------------------------------------------------------- > > diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml > > index e3a044d..4700fee 100644 > > --- a/elasticsearch/pom.xml > > +++ b/elasticsearch/pom.xml > > @@ -124,6 +124,12 @@ limitations under the License. > > <scope>test</scope> > > </dependency> > > <dependency> > > + <groupId>org.hamcrest</groupId> > > + <artifactId>hamcrest-core</artifactId> > > + <version>${hamcrest.version}</version> > > + <scope>test</scope> > > + </dependency> > > + <dependency> > > <groupId>org.slf4j</groupId> > > <artifactId>slf4j-api</artifactId> > > </dependency> > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > > deleted file mode 100644 > > index 1a0f6d0..0000000 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/AbstractElasticsearchTable.java > > +++ /dev/null > > @@ -1,150 +0,0 @@ > > -/* > > - * Licensed to the Apache Software Foundation (ASF) under one or more > > - * contributor license agreements. See the NOTICE file distributed with > > - * this work for additional information regarding copyright ownership. > > - * The ASF licenses this file to you under the Apache License, Version > 2.0 > > - * (the "License"); you may not use this file except in compliance with > > - * the License. You may obtain a copy of the License at > > - * > > - * http://www.apache.org/licenses/LICENSE-2.0 > > - * > > - * Unless required by applicable law or agreed to in writing, software > > - * distributed under the License is distributed on an "AS IS" BASIS, > > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > - * See the License for the specific language governing permissions and > > - * limitations under the License. > > - */ > > -package org.apache.calcite.adapter.elasticsearch; > > - > > -import org.apache.calcite.adapter.java.AbstractQueryableTable; > > -import org.apache.calcite.linq4j.Enumerable; > > -import org.apache.calcite.linq4j.Enumerator; > > -import org.apache.calcite.linq4j.QueryProvider; > > -import org.apache.calcite.linq4j.Queryable; > > -import org.apache.calcite.plan.RelOptCluster; > > -import org.apache.calcite.plan.RelOptTable; > > -import org.apache.calcite.rel.RelNode; > > -import org.apache.calcite.rel.type.RelDataType; > > -import org.apache.calcite.rel.type.RelDataTypeFactory; > > -import org.apache.calcite.schema.SchemaPlus; > > -import org.apache.calcite.schema.TranslatableTable; > > -import org.apache.calcite.schema.impl.AbstractTableQueryable; > > -import org.apache.calcite.sql.type.SqlTypeName; > > - > > -import com.fasterxml.jackson.databind.ObjectMapper; > > - > > -import java.util.List; > > -import java.util.Map; > > -import java.util.Objects; > > - > > -/** > > - * Table based on an Elasticsearch type. > > - */ > > -abstract class AbstractElasticsearchTable extends AbstractQueryableTable > > - implements TranslatableTable { > > - > > - final String indexName; > > - final String typeName; > > - final ObjectMapper mapper; > > - > > - /** > > - * Creates an ElasticsearchTable. > > - * @param indexName Elastic Search index > > - * @param typeName Elastic Search index type > > - * @param mapper Jackson API to parse (and created) JSON documents > > - */ > > - AbstractElasticsearchTable(String indexName, String typeName, > ObjectMapper mapper) { > > - super(Object[].class); > > - this.indexName = Objects.requireNonNull(indexName, "indexName"); > > - this.typeName = Objects.requireNonNull(typeName, "typeName"); > > - this.mapper = Objects.requireNonNull(mapper, "mapper"); > > - } > > - > > - @Override public String toString() { > > - return "ElasticsearchTable{" + indexName + "/" + typeName + "}"; > > - } > > - > > - public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { > > - final RelDataType mapType = relDataTypeFactory.createMapType( > > - relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR), > > - relDataTypeFactory.createTypeWithNullability( > > - relDataTypeFactory.createSqlType(SqlTypeName.ANY), > > - true)); > > - return relDataTypeFactory.builder().add("_MAP", mapType).build(); > > - } > > - > > - public <T> Queryable<T> asQueryable(QueryProvider queryProvider, > SchemaPlus schema, > > - String tableName) { > > - return new ElasticsearchQueryable<>(queryProvider, schema, this, > tableName); > > - } > > - > > - public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable > relOptTable) { > > - final RelOptCluster cluster = context.getCluster(); > > - return new ElasticsearchTableScan(cluster, > cluster.traitSetOf(ElasticsearchRel.CONVENTION), > > - relOptTable, this, null); > > - } > > - > > - /** > > - * In ES 5.x scripted fields start with {@code params._source.foo} > while in ES2.x > > - * {@code _source.foo}. Helper method to build correct query based on > runtime version of elastic. > > - * Used to keep backwards compatibility with ES2. > > - * > > - * @see <a href=" > https://github.com/elastic/elasticsearch/issues/20068">_source > variable</a> > > - * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted > Fields</a> > > - * @return string to be used for scripted fields > > - */ > > - protected abstract String scriptedFieldPrefix(); > > - > > - /** Executes a "find" operation on the underlying type. > > - * > > - * <p>For example, > > - * <code>client.prepareSearch(index).setTypes(type) > > - * .setSource("{\"fields\" : [\"state\"]}")</code></p> > > - * > > - * @param index Elasticsearch index > > - * @param ops List of operations represented as Json strings. > > - * @param fields List of fields to project; or null to return map > > - * @return Enumerator of results > > - */ > > - protected abstract Enumerable<Object> find(String index, List<String> > ops, > > - List<Map.Entry<String, Class>> fields); > > - > > - /** > > - * Implementation of {@link Queryable} based on > > - * a {@link AbstractElasticsearchTable}. > > - * > > - * @param <T> element type > > - */ > > - public static class ElasticsearchQueryable<T> extends > AbstractTableQueryable<T> { > > - ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus > schema, > > - AbstractElasticsearchTable table, String tableName) { > > - super(queryProvider, schema, table, tableName); > > - } > > - > > - public Enumerator<T> enumerator() { > > - return null; > > - } > > - > > - private String getIndex() { > > - return schema.unwrap(ElasticsearchSchema.class).getIndex(); > > - } > > - > > - private AbstractElasticsearchTable getTable() { > > - return (AbstractElasticsearchTable) table; > > - } > > - > > - /** Called via code-generation. > > - * @param ops list of queries (as strings) > > - * @param fields projection > > - * @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND > > - * @return result as enumerable > > - */ > > - @SuppressWarnings("UnusedDeclaration") > > - public Enumerable<Object> find(List<String> ops, > > - List<Map.Entry<String, Class>> fields) { > > - return getTable().find(getIndex(), ops, fields); > > - } > > - } > > -} > > - > > -// End AbstractElasticsearchTable.java > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > > new file mode 100644 > > index 0000000..9627aca > > --- /dev/null > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java > > @@ -0,0 +1,165 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one or more > > + * contributor license agreements. See the NOTICE file distributed with > > + * this work for additional information regarding copyright ownership. > > + * The ASF licenses this file to you under the Apache License, Version > 2.0 > > + * (the "License"); you may not use this file except in compliance with > > + * the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.calcite.adapter.elasticsearch; > > + > > +import org.apache.calcite.plan.RelOptCluster; > > +import org.apache.calcite.plan.RelOptCost; > > +import org.apache.calcite.plan.RelOptPlanner; > > +import org.apache.calcite.plan.RelTraitSet; > > +import org.apache.calcite.rel.InvalidRelException; > > +import org.apache.calcite.rel.RelNode; > > +import org.apache.calcite.rel.core.Aggregate; > > +import org.apache.calcite.rel.core.AggregateCall; > > +import org.apache.calcite.rel.metadata.RelMetadataQuery; > > +import org.apache.calcite.rel.type.RelDataType; > > +import org.apache.calcite.rel.type.RelDataTypeField; > > +import org.apache.calcite.sql.SqlKind; > > +import org.apache.calcite.util.ImmutableBitSet; > > + > > +import java.util.ArrayList; > > +import java.util.EnumSet; > > +import java.util.List; > > +import java.util.Locale; > > +import java.util.Set; > > + > > +/** > > + * Implementation of > > + * {@link org.apache.calcite.rel.core.Aggregate} relational expression > > + * for ElasticSearch. > > + */ > > +public class ElasticsearchAggregate extends Aggregate implements > ElasticsearchRel { > > + > > + private static final Set<SqlKind> SUPPORTED_AGGREGATIONS = > > + EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, > SqlKind.SUM); > > + > > + /** Creates a ElasticsearchAggregate */ > > + ElasticsearchAggregate(RelOptCluster cluster, > > + RelTraitSet traitSet, > > + RelNode input, > > + boolean indicator, > > + ImmutableBitSet groupSet, > > + List<ImmutableBitSet> groupSets, > > + List<AggregateCall> aggCalls) throws InvalidRelException { > > + super(cluster, traitSet, input, indicator, groupSet, groupSets, > aggCalls); > > + > > + if (getConvention() != input.getConvention()) { > > + String message = String.format(Locale.ROOT, "%s != %s", > getConvention(), > > + input.getConvention()); > > + throw new AssertionError(message); > > + } > > + > > + assert getConvention() == input.getConvention(); > > + assert getConvention() == ElasticsearchRel.CONVENTION; > > + assert this.groupSets.size() == 1 : "Grouping sets not supported"; > > + > > + for (AggregateCall aggCall : aggCalls) { > > + if (aggCall.isDistinct()) { > > + throw new InvalidRelException("distinct aggregation not > supported"); > > + } > > + > > + SqlKind kind = aggCall.getAggregation().getKind(); > > + if (!SUPPORTED_AGGREGATIONS.contains(kind)) { > > + final String message = String.format(Locale.ROOT, > > + "Aggregation %s not supported (use one of %s)", kind, > SUPPORTED_AGGREGATIONS); > > + throw new InvalidRelException(message); > > + } > > + } > > + > > + if (getGroupType() != Group.SIMPLE) { > > + final String message = String.format(Locale.ROOT, "Only %s > grouping is supported. " > > + + "Yours is %s", Group.SIMPLE, getGroupType()); > > + throw new InvalidRelException(message); > > + } > > + > > + } > > + > > + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, > boolean indicator, > > + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, > > + List<AggregateCall> aggCalls) { > > + try { > > + return new ElasticsearchAggregate(getCluster(), traitSet, input, > > + indicator, groupSet, groupSets, > > + aggCalls); > > + } catch (InvalidRelException e) { > > + throw new AssertionError(e); > > + } > > + } > > + > > + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, > RelMetadataQuery mq) { > > + return super.computeSelfCost(planner, mq).multiplyBy(0.1); > > + } > > + > > + @Override public void implement(Implementor implementor) { > > + implementor.visitChild(0, getInput()); > > + List<String> inputFields = fieldNames(getInput().getRowType()); > > + > > + for (int group : groupSet) { > > + implementor.addGroupBy(inputFields.get(group)); > > + } > > + > > + for (AggregateCall aggCall : aggCalls) { > > + List<String> names = new ArrayList<>(); > > + for (int i : aggCall.getArgList()) { > > + names.add(inputFields.get(i)); > > + } > > + > > + final String name = names.isEmpty() ? ElasticsearchConstants.ID : > names.get(0); > > + > > + String op = String.format(Locale.ROOT, "\"%s\":{\"field\": > \"%s\"}", > > + toElasticAggregate(aggCall), > > + name); > > + > > + implementor.addAggregation(aggCall.getName(), op); > > + } > > + } > > + > > + /** > > + * Most of the aggregations can be retrieved with single > > + * <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html > ">stats</a> > > + * function. But currently only one-to-one mapping is supported > between sql agg and elastic > > + * aggregation. > > + */ > > + private String toElasticAggregate(AggregateCall call) { > > + SqlKind kind = call.getAggregation().getKind(); > > + switch (kind) { > > + case COUNT: > > + return call.isApproximate() ? "cardinality" : "value_count"; > > + case SUM: > > + return "sum"; > > + case MIN: > > + return "min"; > > + case MAX: > > + return "max"; > > + case AVG: > > + return "avg"; > > + default: > > + throw new IllegalArgumentException("Unknown aggregation kind " + > kind + " for " + call); > > + } > > + } > > + > > + private List<String> fieldNames(RelDataType relDataType) { > > + List<String> names = new ArrayList<>(); > > + > > + for (RelDataTypeField rdtf : relDataType.getFieldList()) { > > + names.add(rdtf.getName()); > > + } > > + return names; > > + } > > + > > +} > > + > > +// End ElasticsearchAggregate.java > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > > index ed628cc..2c4c42c 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java > > @@ -30,18 +30,9 @@ interface ElasticsearchConstants { > > String FIELDS = "fields"; > > String SOURCE_PAINLESS = "params._source"; > > String SOURCE_GROOVY = "_source"; > > - String SOURCE = SOURCE_GROOVY; > > String ID = "_id"; > > String UID = "_uid"; > > > > - /* Aggregation pushdown operations supported */ > > - String AGG_SUM = "SUM"; > > - String AGG_SUM0 = "$SUM0"; > > - String AGG_COUNT = "COUNT"; > > - String AGG_MIN = "MIN"; > > - String AGG_MAX = "MAX"; > > - String AGG_AVG = "AVG"; > > - > > Set<String> META_COLUMNS = ImmutableSet.of(UID, ID, TYPE, INDEX); > > > > } > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > > index d87de7e..16ac92d 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java > > @@ -26,27 +26,27 @@ import java.util.Map; > > > > /** > > * Util functions which convert > > - * {@link > org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit} > > + * {@link ElasticsearchJson.SearchHit} > > * into calcite specific return type (map, object[], list etc.) > > */ > > class ElasticsearchEnumerators { > > > > private ElasticsearchEnumerators() {} > > > > - private static Function1<ElasticsearchSearchResult.SearchHit, Map> > mapGetter() { > > - return new Function1<ElasticsearchSearchResult.SearchHit, Map>() { > > - public Map apply(ElasticsearchSearchResult.SearchHit hits) { > > + private static Function1<ElasticsearchJson.SearchHit, Map> > mapGetter() { > > + return new Function1<ElasticsearchJson.SearchHit, Map>() { > > + public Map apply(ElasticsearchJson.SearchHit hits) { > > return hits.sourceOrFields(); > > } > > }; > > } > > > > - private static Function1<ElasticsearchSearchResult.SearchHit, Object> > singletonGetter( > > + private static Function1<ElasticsearchJson.SearchHit, Object> > singletonGetter( > > final String fieldName, > > final Class fieldClass) { > > - return new Function1<ElasticsearchSearchResult.SearchHit, Object>() > { > > - public Object apply(ElasticsearchSearchResult.SearchHit hits) { > > - return convert(hits.sourceOrFields(), fieldClass); > > + return new Function1<ElasticsearchJson.SearchHit, Object>() { > > + public Object apply(ElasticsearchJson.SearchHit hits) { > > + return convert(hits.valueOrNull(fieldName), fieldClass); > > } > > }; > > } > > @@ -59,30 +59,38 @@ class ElasticsearchEnumerators { > > * > > * @return function that converts the search result into a generic > array > > */ > > - private static Function1<ElasticsearchSearchResult.SearchHit, > Object[]> listGetter( > > + private static Function1<ElasticsearchJson.SearchHit, Object[]> > listGetter( > > final List<Map.Entry<String, Class>> fields) { > > - return new Function1<ElasticsearchSearchResult.SearchHit, > Object[]>() { > > - public Object[] apply(ElasticsearchSearchResult.SearchHit hit) { > > + return new Function1<ElasticsearchJson.SearchHit, Object[]>() { > > + public Object[] apply(ElasticsearchJson.SearchHit hit) { > > Object[] objects = new Object[fields.size()]; > > for (int i = 0; i < fields.size(); i++) { > > final Map.Entry<String, Class> field = fields.get(i); > > final String name = field.getKey(); > > final Class type = field.getValue(); > > - objects[i] = convert(hit.value(name), type); > > + objects[i] = convert(hit.valueOrNull(name), type); > > } > > return objects; > > } > > }; > > } > > > > - static Function1<ElasticsearchSearchResult.SearchHit, Object> getter( > > + static Function1<ElasticsearchJson.SearchHit, Object> getter( > > List<Map.Entry<String, Class>> fields) { > > //noinspection unchecked > > - return fields == null > > - ? (Function1) mapGetter() > > - : fields.size() == 1 > > - ? singletonGetter(fields.get(0).getKey(), > fields.get(0).getValue()) > > - : (Function1) listGetter(fields); > > + final Function1 getter; > > + if (fields == null || fields.size() == 1 && > "_MAP".equals(fields.get(0).getKey())) { > > + // select * from table > > + getter = mapGetter(); > > + } else if (fields.size() == 1) { > > + // select foo from table > > + getter = singletonGetter(fields.get(0).getKey(), > fields.get(0).getValue()); > > + } else { > > + // select a, b, c from table > > + getter = listGetter(fields); > > + } > > + > > + return getter; > > } > > > > private static Object convert(Object o, Class clazz) { > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > > index 4d187b1..c339671 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java > > @@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil; > > import org.apache.calcite.plan.RelTraitSet; > > import org.apache.calcite.rel.RelNode; > > import org.apache.calcite.rel.core.Filter; > > -import org.apache.calcite.rel.core.Project; > > import org.apache.calcite.rel.metadata.RelMetadataQuery; > > import org.apache.calcite.rex.RexCall; > > import org.apache.calcite.rex.RexInputRef; > > @@ -70,24 +69,13 @@ public class ElasticsearchFilter extends Filter > implements ElasticsearchRel { > > > > @Override public void implement(Implementor implementor) { > > implementor.visitChild(0, getInput()); > > - List<String> fieldNames; > > - if (input instanceof Project) { > > - final List<RexNode> projects = ((Project) input).getProjects(); > > - fieldNames = new ArrayList<>(projects.size()); > > - for (RexNode project : projects) { > > - String name = > project.accept(MapProjectionFieldVisitor.INSTANCE); > > - fieldNames.add(name); > > - } > > - } else { > > - fieldNames = > ElasticsearchRules.elasticsearchFieldNames(getRowType()); > > - } > > ObjectMapper mapper = implementor.elasticsearchTable.mapper; > > PredicateAnalyzerTranslator translator = new > PredicateAnalyzerTranslator(mapper); > > try { > > implementor.add(translator.translateMatch(condition)); > > } catch (IOException e) { > > throw new UncheckedIOException(e); > > - } catch (ExpressionNotAnalyzableException e) { > > + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) { > > throw new RuntimeException(e); > > } > > } > > @@ -103,7 +91,8 @@ public class ElasticsearchFilter extends Filter > implements ElasticsearchRel { > > this.mapper = Objects.requireNonNull(mapper, "mapper"); > > } > > > > - String translateMatch(RexNode condition) throws IOException, > ExpressionNotAnalyzableException { > > + String translateMatch(RexNode condition) throws IOException, > > + PredicateAnalyzer.ExpressionNotAnalyzableException { > > > > StringWriter writer = new StringWriter(); > > JsonGenerator generator = > mapper.getFactory().createGenerator(writer); > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > > new file mode 100644 > > index 0000000..7c80e82 > > --- /dev/null > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java > > @@ -0,0 +1,614 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one or more > > + * contributor license agreements. See the NOTICE file distributed with > > + * this work for additional information regarding copyright ownership. > > + * The ASF licenses this file to you under the Apache License, Version > 2.0 > > + * (the "License"); you may not use this file except in compliance with > > + * the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > +package org.apache.calcite.adapter.elasticsearch; > > + > > +import com.fasterxml.jackson.annotation.JsonCreator; > > +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; > > +import com.fasterxml.jackson.annotation.JsonProperty; > > +import com.fasterxml.jackson.core.JsonParser; > > +import com.fasterxml.jackson.core.JsonProcessingException; > > +import com.fasterxml.jackson.databind.DeserializationContext; > > +import com.fasterxml.jackson.databind.JsonNode; > > +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; > > +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; > > +import com.fasterxml.jackson.databind.node.ArrayNode; > > +import com.fasterxml.jackson.databind.node.JsonNodeFactory; > > +import com.fasterxml.jackson.databind.node.ObjectNode; > > + > > +import java.io.IOException; > > +import java.time.Duration; > > +import java.util.ArrayList; > > +import java.util.Arrays; > > +import java.util.Collections; > > +import java.util.HashSet; > > +import java.util.Iterator; > > +import java.util.LinkedHashMap; > > +import java.util.List; > > +import java.util.Locale; > > +import java.util.Map; > > +import java.util.Objects; > > +import java.util.Set; > > +import java.util.function.BiConsumer; > > +import java.util.function.Consumer; > > +import java.util.stream.StreamSupport; > > + > > +import static java.util.Collections.unmodifiableMap; > > + > > +/** > > + * Internal objects (and deserializers) used to parse elastic search > results > > + * (which are in JSON format). > > + * > > + * <p>Since we're using basic row-level rest client http response has > to be > > + * processed manually using JSON (jackson) library. > > + */ > > +class ElasticsearchJson { > > + > > + /** > > + * Used as special aggregation key for missing values (documents > which are missing a field). > > + * Buckets with that value are then converted to {@code null}s in > flat tabular format. > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html">Missing > Value</a> > > + */ > > + static final JsonNode MISSING_VALUE = > JsonNodeFactory.instance.textNode("__MISSING__"); > > + > > + private ElasticsearchJson() {} > > + > > + /** > > + * Visits leaves of the aggregation where all values are stored. > > + */ > > + static void visitValueNodes(Aggregations aggregations, > Consumer<Map<String, Object>> consumer) { > > + Objects.requireNonNull(aggregations, "aggregations"); > > + Objects.requireNonNull(consumer, "consumer"); > > + > > + List<Bucket> buckets = new ArrayList<>(); > > + > > + Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>(); > > + > > + BiConsumer<RowKey, MultiValue> cons = (r, v) -> > > + rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v); > > + aggregations.forEach(a -> visitValueNodes(a, buckets, cons)); > > + rows.forEach((k, v) -> { > > + Map<String, Object> row = new LinkedHashMap<>(k.keys); > > + v.forEach(val -> row.put(val.getName(), val.value())); > > + consumer.accept(row); > > + }); > > + } > > + > > + /** > > + * Identifies a calcite row (as in relational algebra) > > + */ > > + private static class RowKey { > > + private final Map<String, Object> keys; > > + private final int hashCode; > > + > > + private RowKey(final Map<String, Object> keys) { > > + this.keys = Objects.requireNonNull(keys, "keys"); > > + this.hashCode = Objects.hashCode(keys); > > + } > > + > > + private RowKey(List<Bucket> buckets) { > > + this(toMap(buckets)); > > + } > > + > > + private static Map<String, Object> toMap(Iterable<Bucket> buckets) { > > + return StreamSupport.stream(buckets.spliterator(), false) > > + .collect(LinkedHashMap::new, > > + (m, v) -> m.put(v.getName(), v.key()), > > + LinkedHashMap::putAll); > > + } > > + > > + @Override public boolean equals(final Object o) { > > + if (this == o) { > > + return true; > > + } > > + if (o == null || getClass() != o.getClass()) { > > + return false; > > + } > > + final RowKey rowKey = (RowKey) o; > > + return hashCode == rowKey.hashCode > > + && Objects.equals(keys, rowKey.keys); > > + } > > + > > + @Override public int hashCode() { > > + return this.hashCode; > > + } > > + } > > + > > + private static void visitValueNodes(Aggregation aggregation, > List<Bucket> parents, > > + BiConsumer<RowKey, MultiValue> consumer) { > > + > > + if (aggregation instanceof MultiValue) { > > + // publish one value of the row > > + RowKey key = new RowKey(parents); > > + consumer.accept(key, (MultiValue) aggregation); > > + return; > > + } > > + > > + if (aggregation instanceof Bucket) { > > + Bucket bucket = (Bucket) aggregation; > > + parents.add(bucket); > > + bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, > consumer)); > > + parents.remove(parents.size() - 1); > > + } else if (aggregation instanceof HasAggregations) { > > + HasAggregations children = (HasAggregations) aggregation; > > + children.getAggregations().forEach(a -> visitValueNodes(a, > parents, consumer)); > > + } else if (aggregation instanceof MultiBucketsAggregation) { > > + MultiBucketsAggregation multi = (MultiBucketsAggregation) > aggregation; > > + multi.buckets().forEach(b -> { > > + parents.add(b); > > + b.getAggregations().forEach(a -> visitValueNodes(a, parents, > consumer)); > > + parents.remove(parents.size() - 1); > > + }); > > + } > > + > > + } > > + > > + /** > > + * Response from Elastic > > + */ > > + @JsonIgnoreProperties(ignoreUnknown = true) > > + static class Result { > > + private final SearchHits hits; > > + private final Aggregations aggregations; > > + private final long took; > > + > > + /** > > + * Constructor for this instance. > > + * @param hits list of matched documents > > + * @param took time taken (in took) for this query to execute > > + */ > > + @JsonCreator > > + Result(@JsonProperty("hits") SearchHits hits, > > + @JsonProperty("aggregations") Aggregations aggregations, > > + @JsonProperty("took") long took) { > > + this.hits = Objects.requireNonNull(hits, "hits"); > > + this.aggregations = aggregations; > > + this.took = took; > > + } > > + > > + SearchHits searchHits() { > > + return hits; > > + } > > + > > + Aggregations aggregations() { > > + return aggregations; > > + } > > + > > + public Duration took() { > > + return Duration.ofMillis(took); > > + } > > + > > + } > > + > > + /** > > + * Similar to {@code SearchHits} in ES. Container for {@link > SearchHit} > > + */ > > + @JsonIgnoreProperties(ignoreUnknown = true) > > + static class SearchHits { > > + > > + private final long total; > > + private final List<SearchHit> hits; > > + > > + @JsonCreator > > + SearchHits(@JsonProperty("total")final long total, > > + @JsonProperty("hits") final List<SearchHit> hits) { > > + this.total = total; > > + this.hits = Objects.requireNonNull(hits, "hits"); > > + } > > + > > + public List<SearchHit> hits() { > > + return this.hits; > > + } > > + > > + public long total() { > > + return total; > > + } > > + > > + } > > + > > + /** > > + * Concrete result record which matched the query. Similar to {@code > SearchHit} in ES. > > + */ > > + @JsonIgnoreProperties(ignoreUnknown = true) > > + static class SearchHit { > > + private final String id; > > + private final Map<String, Object> source; > > + private final Map<String, Object> fields; > > + > > + @JsonCreator > > + SearchHit(@JsonProperty("_id") final String id, > > + @JsonProperty("_source") final Map<String, > Object> source, > > + @JsonProperty("fields") final Map<String, Object> > fields) { > > + this.id = Objects.requireNonNull(id, "id"); > > + > > + // both can't be null > > + if (source == null && fields == null) { > > + final String message = String.format(Locale.ROOT, > > + "Both '_source' and 'fields' are missing for %s", id); > > + throw new IllegalArgumentException(message); > > + } > > + > > + // both can't be non-null > > + if (source != null && fields != null) { > > + final String message = String.format(Locale.ROOT, > > + "Both '_source' and 'fields' are populated (non-null) for > %s", id); > > + throw new IllegalArgumentException(message); > > + } > > + > > + this.source = source; > > + this.fields = fields; > > + } > > + > > + /** > > + * Returns id of this hit (usually document id) > > + * @return unique id > > + */ > > + public String id() { > > + return id; > > + } > > + > > + Object valueOrNull(String name) { > > + Objects.requireNonNull(name, "name"); > > + if (fields != null && fields.containsKey(name)) { > > + Object field = fields.get(name); > > + if (field instanceof Iterable) { > > + // return first element (or null) > > + Iterator<?> iter = ((Iterable<?>) field).iterator(); > > + return iter.hasNext() ? iter.next() : null; > > + } > > + > > + return field; > > + } > > + > > + return valueFromPath(source, name); > > + } > > + > > + /** > > + * Returns property from nested maps given a path like {@code > a.b.c}. > > + * @param map current map > > + * @param path field path(s), optionally with dots ({@code a.b.c}). > > + * @return value located at path {@code path} or {@code null} if > not found. > > + */ > > + private static Object valueFromPath(Map<String, Object> map, String > path) { > > + if (map == null) { > > + return null; > > + } > > + > > + if (map.containsKey(path)) { > > + return map.get(path); > > + } > > + > > + // maybe pattern of type a.b.c > > + final int index = path.indexOf('.'); > > + if (index == -1) { > > + return null; > > + } > > + > > + final String prefix = path.substring(0, index); > > + final String suffix = path.substring(index + 1); > > + > > + Object maybeMap = map.get(prefix); > > + if (maybeMap instanceof Map) { > > + return valueFromPath((Map<String, Object>) maybeMap, suffix); > > + } > > + > > + return null; > > + } > > + > > + Map<String, Object> source() { > > + return source; > > + } > > + > > + Map<String, Object> fields() { > > + return fields; > > + } > > + > > + Map<String, Object> sourceOrFields() { > > + return source != null ? source : fields; > > + } > > + } > > + > > + > > + /** > > + * {@link Aggregation} container. > > + */ > > + @JsonDeserialize(using = AggregationsDeserializer.class) > > + static class Aggregations implements Iterable<Aggregation> { > > + > > + private final List<? extends Aggregation> aggregations; > > + private Map<String, Aggregation> aggregationsAsMap; > > + > > + Aggregations(List<? extends Aggregation> aggregations) { > > + this.aggregations = Objects.requireNonNull(aggregations, > "aggregations"); > > + } > > + > > + /** > > + * Iterates over the {@link Aggregation}s. > > + */ > > + @Override public final Iterator<Aggregation> iterator() { > > + return asList().iterator(); > > + } > > + > > + /** > > + * The list of {@link Aggregation}s. > > + */ > > + final List<Aggregation> asList() { > > + return Collections.unmodifiableList(aggregations); > > + } > > + > > + /** > > + * Returns the {@link Aggregation}s keyed by aggregation name. Lazy > init. > > + */ > > + final Map<String, Aggregation> asMap() { > > + if (aggregationsAsMap == null) { > > + Map<String, Aggregation> map = new > LinkedHashMap<>(aggregations.size()); > > + for (Aggregation aggregation : aggregations) { > > + map.put(aggregation.getName(), aggregation); > > + } > > + this.aggregationsAsMap = unmodifiableMap(map); > > + } > > + return aggregationsAsMap; > > + } > > + > > + /** > > + * Returns the aggregation that is associated with the specified > name. > > + */ > > + @SuppressWarnings("unchecked") > > + public final <A extends Aggregation> A get(String name) { > > + return (A) asMap().get(name); > > + } > > + > > + @Override public final boolean equals(Object obj) { > > + if (obj == null || getClass() != obj.getClass()) { > > + return false; > > + } > > + return aggregations.equals(((Aggregations) obj).aggregations); > > + } > > + > > + @Override public final int hashCode() { > > + return Objects.hash(getClass(), aggregations); > > + } > > + > > + } > > + > > + /** > > + * Identifies all aggregations > > + */ > > + interface Aggregation { > > + > > + /** > > + * @return The name of this aggregation. > > + */ > > + String getName(); > > + > > + } > > + > > + /** > > + * Allows traversing aggregations tree > > + */ > > + interface HasAggregations { > > + Aggregations getAggregations(); > > + } > > + > > + /** > > + * An aggregation that returns multiple buckets > > + */ > > + static class MultiBucketsAggregation implements Aggregation { > > + > > + private final String name; > > + private final List<Bucket> buckets; > > + > > + MultiBucketsAggregation(final String name, > > + final List<Bucket> buckets) { > > + this.name = name; > > + this.buckets = buckets; > > + } > > + > > + /** > > + * @return The buckets of this aggregation. > > + */ > > + List<Bucket> buckets() { > > + return buckets; > > + } > > + > > + @Override public String getName() { > > + return name; > > + } > > + } > > + > > + /** > > + * A bucket represents a criteria to which all documents that fall in > it adhere to. > > + * It is also uniquely identified > > + * by a key, and can potentially hold sub-aggregations computed over > all documents in it. > > + */ > > + static class Bucket implements HasAggregations, Aggregation { > > + private final Object key; > > + private final String name; > > + private final Aggregations aggregations; > > + > > + Bucket(final Object key, > > + final String name, > > + final Aggregations aggregations) { > > + this.key = key; // key can be set after construction > > + this.name = Objects.requireNonNull(name, "name"); > > + this.aggregations = Objects.requireNonNull(aggregations, > "aggregations"); > > + } > > + > > + /** > > + * @return The key associated with the bucket > > + */ > > + Object key() { > > + return key; > > + } > > + > > + /** > > + * @return The key associated with the bucket as a string > > + */ > > + String keyAsString() { > > + return Objects.toString(key()); > > + } > > + > > + /** > > + * @return The sub-aggregations of this bucket > > + */ > > + @Override public Aggregations getAggregations() { > > + return aggregations; > > + } > > + > > + @Override public String getName() { > > + return name; > > + } > > + } > > + > > + /** > > + * Multi value aggregatoin like > > + * <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html > ">Stats</a> > > + */ > > + static class MultiValue implements Aggregation { > > + private final String name; > > + private final Map<String, Object> values; > > + > > + MultiValue(final String name, final Map<String, Object> values) { > > + this.name = Objects.requireNonNull(name, "name"); > > + this.values = Objects.requireNonNull(values, "values"); > > + } > > + > > + @Override public String getName() { > > + return name; > > + } > > + > > + Map<String, Object> values() { > > + return values; > > + } > > + > > + /** > > + * For single value. Returns single value represented by this leaf > aggregation. > > + * @return value corresponding to {@code value} > > + */ > > + Object value() { > > + if (!values().containsKey("value")) { > > + throw new IllegalStateException("'value' field not present in > this aggregation"); > > + } > > + > > + return values().get("value"); > > + } > > + > > + } > > + > > + /** > > + * Allows to de-serialize nested aggregation structures. > > + */ > > + static class AggregationsDeserializer extends > StdDeserializer<Aggregations> { > > + > > + private static final Set<String> IGNORE_TOKENS = new > HashSet<>(Arrays.asList("meta", > > + "buckets", "value", "values", "value_as_string", "doc_count", > "key", "key_as_string")); > > + > > + AggregationsDeserializer() { > > + super(Aggregations.class); > > + } > > + > > + @Override public Aggregations deserialize(final JsonParser parser, > > + final DeserializationContext ctxt) > > + throws IOException { > > + > > + ObjectNode node = parser.getCodec().readTree(parser); > > + return parseAggregations(parser, node); > > + } > > + > > + private static Aggregations parseAggregations(JsonParser parser, > ObjectNode node) > > + throws JsonProcessingException { > > + > > + List<Aggregation> aggregations = new ArrayList<>(); > > + > > + Iterable<Map.Entry<String, JsonNode>> iter = node::fields; > > + for (Map.Entry<String, JsonNode> entry : iter) { > > + final String name = entry.getKey(); > > + final JsonNode value = entry.getValue(); > > + > > + Aggregation agg = null; > > + if (value.has("buckets")) { > > + agg = parseBuckets(parser, name, (ArrayNode) > value.get("buckets")); > > + } else if (value.isObject() && !IGNORE_TOKENS.contains(name)) { > > + // leaf > > + agg = parseValue(parser, name, (ObjectNode) value); > > + } > > + > > + if (agg != null) { > > + aggregations.add(agg); > > + } > > + } > > + > > + return new Aggregations(aggregations); > > + } > > + > > + > > + > > + private static MultiValue parseValue(JsonParser parser, String > name, ObjectNode node) > > + throws JsonProcessingException { > > + > > + return new MultiValue(name, parser.getCodec().treeToValue(node, > Map.class)); > > + } > > + > > + private static Aggregation parseBuckets(JsonParser parser, String > name, ArrayNode nodes) > > + throws JsonProcessingException { > > + > > + List<Bucket> buckets = new ArrayList<>(nodes.size()); > > + for (JsonNode b: nodes) { > > + buckets.add(parseBucket(parser, name, (ObjectNode) b)); > > + } > > + > > + return new MultiBucketsAggregation(name, buckets); > > + } > > + > > + /** > > + * Determines if current key is a missing field key. Missing key is > returned when document > > + * does not have pivoting attribute (example {@code GROUP BY > _MAP['a.b.missing']}). It helps > > + * grouping documents which don't have a field. In relational > algebra this > > + * would be {@code null}. > > + * > > + * @param key current {@code key} (usually string) as returned by ES > > + * @return {@code true} if this value > > + * @see #MISSING_VALUE > > + */ > > + private static boolean isMissingBucket(JsonNode key) { > > + return MISSING_VALUE.equals(key); > > + } > > + > > + private static Bucket parseBucket(JsonParser parser, String name, > ObjectNode node) > > + throws JsonProcessingException { > > + > > + final JsonNode keyNode = node.get("key"); > > + final Object key; > > + if (isMissingBucket(keyNode) || keyNode.isNull()) { > > + key = null; > > + } else if (keyNode.isTextual()) { > > + key = keyNode.textValue(); > > + } else if (keyNode.isNumber()) { > > + key = keyNode.numberValue(); > > + } else if (keyNode.isBoolean()) { > > + key = keyNode.booleanValue(); > > + } else { > > + // don't usually expect keys to be Objects > > + key = parser.getCodec().treeToValue(node, Map.class); > > + } > > + > > + return new Bucket(key, name, parseAggregations(parser, node)); > > + } > > + > > + } > > +} > > + > > +// End ElasticsearchJson.java > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > > index 72753e6..709156f 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java > > @@ -27,8 +27,17 @@ import java.util.List; > > * Builtin methods in the Elasticsearch adapter. > > */ > > enum ElasticsearchMethod { > > - > ELASTICSEARCH_QUERYABLE_FIND(AbstractElasticsearchTable.ElasticsearchQueryable.class, > > - "find", List.class, List.class); > > + > > + > ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class, > > + "find", > > + List.class, // ops - projections and other stuff > > + List.class, // fields > > + List.class, // sort > > + List.class, // groupBy > > + List.class, // aggregations > > + Long.class, // offset > > + Long.class // fetch > > + ); > > > > public final Method method; > > > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > > index 7d5811c..d0841c3 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java > > @@ -101,11 +101,7 @@ public class ElasticsearchProject extends Project > implements ElasticsearchRel { > > query.append("\"script_fields\": {" + String.join(", ", > scriptFields) + "}"); > > } > > > > - for (String opfield : implementor.list) { > > - if (opfield.startsWith("\"_source\"")) { > > - implementor.list.remove(opfield); > > - } > > - } > > + implementor.list.removeIf(l -> l.startsWith("\"_source\"")); > > implementor.add(query.toString()); > > } > > } > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > > index 436adf9..1dad691 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java > > @@ -18,10 +18,14 @@ package org.apache.calcite.adapter.elasticsearch; > > > > import org.apache.calcite.plan.Convention; > > import org.apache.calcite.plan.RelOptTable; > > +import org.apache.calcite.rel.RelFieldCollation; > > import org.apache.calcite.rel.RelNode; > > +import org.apache.calcite.util.Pair; > > > > import java.util.ArrayList; > > import java.util.List; > > +import java.util.Map; > > +import java.util.Objects; > > > > /** > > * Relational expression that uses Elasticsearch calling convention. > > @@ -39,19 +43,75 @@ public interface ElasticsearchRel extends RelNode { > > * {@link ElasticsearchRel} nodes into an Elasticsearch query. > > */ > > class Implementor { > > + > > final List<String> list = new ArrayList<>(); > > > > + /** > > + * Sorting clauses. > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html > ">Sort</a> > > + */ > > + final List<Map.Entry<String, RelFieldCollation.Direction>> sort = > new ArrayList<>(); > > + > > + /** > > + * Elastic aggregation ({@code MIN / MAX / COUNT} etc.) statements > (functions). > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html > ">aggregations</a> > > + */ > > + final List<Map.Entry<String, String>> aggregations = new > ArrayList<>(); > > + > > + /** > > + * Allows bucketing documents together. Similar to {@code select > ... from table group by field1} > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-bucket.html">Bucket > Aggregrations</a> > > + */ > > + final List<String> groupBy = new ArrayList<>(); > > + > > + /** > > + * Starting index (default {@code 0}). Equivalent to {@code start} > in ES query. > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html > ">From/Size</a> > > + */ > > + Long offset; > > + > > + /** > > + * Number of records to return. Equivalent to {@code size} in ES > query. > > + * @see <a href=" > https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html > ">From/Size</a> > > + */ > > + Long fetch; > > + > > RelOptTable table; > > - AbstractElasticsearchTable elasticsearchTable; > > + ElasticsearchTable elasticsearchTable; > > > > - public void add(String findOp) { > > + void add(String findOp) { > > list.add(findOp); > > } > > > > - public void visitChild(int ordinal, RelNode input) { > > + void addGroupBy(String field) { > > + Objects.requireNonNull(field, "field"); > > + groupBy.add(field); > > + } > > + > > + void addSort(String field, RelFieldCollation.Direction direction) { > > + Objects.requireNonNull(field, "field"); > > + sort.add(new Pair<>(field, direction)); > > + } > > + > > + void addAggregation(String field, String expression) { > > + Objects.requireNonNull(field, "field"); > > + Objects.requireNonNull(expression, "expression"); > > + aggregations.add(new Pair<>(field, expression)); > > + } > > + > > + void offset(long offset) { > > + this.offset = offset; > > + } > > + > > + void fetch(long fetch) { > > + this.fetch = fetch; > > + } > > + > > + void visitChild(int ordinal, RelNode input) { > > assert ordinal == 0; > > ((ElasticsearchRel) input).implement(this); > > } > > + > > } > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > > index 97e934c..b442ddd 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java > > @@ -23,10 +23,12 @@ import org.apache.calcite.plan.Convention; > > import org.apache.calcite.plan.RelOptRule; > > import org.apache.calcite.plan.RelTrait; > > import org.apache.calcite.plan.RelTraitSet; > > +import org.apache.calcite.rel.InvalidRelException; > > import org.apache.calcite.rel.RelCollations; > > import org.apache.calcite.rel.RelNode; > > import org.apache.calcite.rel.convert.ConverterRule; > > import org.apache.calcite.rel.core.Sort; > > +import org.apache.calcite.rel.logical.LogicalAggregate; > > import org.apache.calcite.rel.logical.LogicalFilter; > > import org.apache.calcite.rel.logical.LogicalProject; > > import org.apache.calcite.rel.type.RelDataType; > > @@ -53,7 +55,8 @@ class ElasticsearchRules { > > static final RelOptRule[] RULES = { > > ElasticsearchSortRule.INSTANCE, > > ElasticsearchFilterRule.INSTANCE, > > - ElasticsearchProjectRule.INSTANCE > > + ElasticsearchProjectRule.INSTANCE, > > + ElasticsearchAggregateRule.INSTANCE > > }; > > > > private ElasticsearchRules() {} > > @@ -147,7 +150,7 @@ class ElasticsearchRules { > > } > > } > > throw new IllegalArgumentException("Translation of " + > call.toString() > > - + "is not supported by ElasticsearchProject"); > > + + " is not supported by ElasticsearchProject"); > > } > > > > List<String> visitList(List<RexNode> list) { > > @@ -217,6 +220,37 @@ class ElasticsearchRules { > > } > > > > /** > > + * Rule to convert an {@link > org.apache.calcite.rel.logical.LogicalAggregate} > > + * to an {@link ElasticsearchAggregate}. > > + */ > > + private static class ElasticsearchAggregateRule extends > ElasticsearchConverterRule { > > + static final RelOptRule INSTANCE = new ElasticsearchAggregateRule(); > > + > > + private ElasticsearchAggregateRule() { > > + super(LogicalAggregate.class, Convention.NONE, > ElasticsearchRel.CONVENTION, > > + "ElasticsearchAggregateRule"); > > + } > > + > > + public RelNode convert(RelNode rel) { > > + final LogicalAggregate agg = (LogicalAggregate) rel; > > + final RelTraitSet traitSet = agg.getTraitSet().replace(out); > > + try { > > + return new ElasticsearchAggregate( > > + rel.getCluster(), > > + traitSet, > > + convert(agg.getInput(), traitSet.simplify()), > > + agg.indicator, > > + agg.getGroupSet(), > > + agg.getGroupSets(), > > + agg.getAggCallList()); > > + } catch (InvalidRelException e) { > > + return null; > > + } > > + } > > + } > > + > > + > > + /** > > * Rule to convert a {@link > org.apache.calcite.rel.logical.LogicalProject} > > * to an {@link ElasticsearchProject}. > > */ > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > > index 1c630ad..80a94be 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java > > @@ -30,6 +30,7 @@ import org.elasticsearch.client.RestClient; > > import java.io.IOException; > > import java.io.InputStream; > > import java.io.UncheckedIOException; > > +import java.util.Collections; > > import java.util.Locale; > > import java.util.Map; > > import java.util.Objects; > > @@ -48,6 +49,8 @@ public class ElasticsearchSchema extends > AbstractSchema { > > > > private final ObjectMapper mapper; > > > > + private final Map<String, Table> tableMap; > > + > > /** > > * Allows schema to be instantiated from existing elastic search > client. > > * This constructor is used in tests. > > @@ -56,20 +59,33 @@ public class ElasticsearchSchema extends > AbstractSchema { > > * @param index name of ES index > > */ > > public ElasticsearchSchema(RestClient client, ObjectMapper mapper, > String index) { > > + this(client, mapper, index, null); > > + } > > + > > + public ElasticsearchSchema(RestClient client, ObjectMapper mapper, > String index, String type) { > > super(); > > this.client = Objects.requireNonNull(client, "client"); > > this.mapper = Objects.requireNonNull(mapper, "mapper"); > > this.index = Objects.requireNonNull(index, "index"); > > + if (type == null) { > > + try { > > + this.tableMap = createTables(listTypesFromElastic()); > > + } catch (IOException e) { > > + throw new UncheckedIOException("Couldn't get types for " + > index, e); > > + } > > + } else { > > + this.tableMap = createTables(Collections.singleton(type)); > > + } > > } > > > > @Override protected Map<String, Table> getTableMap() { > > + return tableMap; > > + } > > + > > + private Map<String, Table> createTables(Iterable<String> types) { > > final ImmutableMap.Builder<String, Table> builder = > ImmutableMap.builder(); > > - try { > > - for (String type: listTypes()) { > > - builder.put(type, new ElasticsearchTable(client, mapper, index, > type)); > > - } > > - } catch (IOException e) { > > - throw new UncheckedIOException("Failed to get types for " + > index, e); > > + for (String type : types) { > > + builder.put(type, new ElasticsearchTable(client, mapper, index, > type)); > > } > > return builder.build(); > > } > > @@ -81,7 +97,7 @@ public class ElasticsearchSchema extends > AbstractSchema { > > * @throws IOException for any IO related issues > > * @throws IllegalStateException if reply is not understood > > */ > > - private Set<String> listTypes() throws IOException { > > + private Set<String> listTypesFromElastic() throws IOException { > > final String endpoint = "/" + index + "/_mapping"; > > final Response response = client.performRequest("GET", endpoint); > > try (InputStream is = response.getEntity().getContent()) { > > > > > http://git-wip-us.apache.org/repos/asf/calcite/blob/79af1c9b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > > ---------------------------------------------------------------------- > > diff --git > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > > index ed669aa..9078b72 100644 > > --- > a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > > +++ > b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java > > @@ -23,15 +23,12 @@ import org.apache.calcite.plan.RelTraitSet; > > import org.apache.calcite.rel.RelCollation; > > import org.apache.calcite.rel.RelFieldCollation; > > import org.apache.calcite.rel.RelNode; > > -import org.apache.calcite.rel.core.Project; > > import org.apache.calcite.rel.core.Sort; > > import org.apache.calcite.rel.metadata.RelMetadataQuery; > > import org.apache.calcite.rel.type.RelDataTypeField; > > import org.apache.calcite.rex.RexLiteral; > > import org.apache.calcite.rex.RexNode; > > -import org.apache.calcite.util.Util; > > > > -import java.util.ArrayList; > > import java.util.List; > > > > /** > > @@ -57,48 +54,22 @@ public class ElasticsearchSort extends Sort > implements ElasticsearchRel { > > > > @Override public void implement(Implementor implementor) { > > implementor.visitChild(0, getInput()); > > - if (!collation.getFieldCollations().isEmpty()) { > > - final List<String> keys = new ArrayList<>(); > > - if (input instanceof Project) { > > - final List<RexNode> projects = ((Project) input).getProjects(); > > + final List<RelDataTypeField> fields = getRowType().getFieldList(); > > > > - for (RelFieldCollation fieldCollation : > collation.getFieldCollations()) { > > - RexNode project = > projects.get(fieldCollation.getFieldIndex()); > > - String name = > project.accept(MapProjectionFieldVisitor.INSTANCE); > > - keys.add(ElasticsearchRules.quote(name) + ": " + > direction(fieldCollation)); > > - } > > - } else { > > - final List<RelDataTypeField> fields = > getRowType().getFieldList(); > > - > > - for (RelFieldCollation fieldCollation : > collation.getFieldCollations()) { > > - final String name = > fields.get(fieldCollation.getFieldIndex()).getName(); > > - keys.add(ElasticsearchRules.quote(name) + ": " + > direction(fieldCollation)); > > - } > > - } > > - > > - implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", > "}") + "]"); > > + for (RelFieldCollation fieldCollation : > collation.getFieldCollations()) { > > + final String name = > fields.get(fieldCollation.getFieldIndex()).getName(); > > + implementor.addSort(name, fieldCollation.getDirection()); > > } > > > > if (offset != null) { > > - implementor.add("\"from\": " + ((RexLiteral) offset).getValue