[CALCITE-1253] Elasticsearch adapter (Subhobrata Dey) Close apache/calcite#236
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/f3caf13b Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/f3caf13b Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/f3caf13b Branch: refs/heads/master Commit: f3caf13b9f1cd92f95dcf27716466bf2133e1ed7 Parents: b76affc Author: Subhobrata Dey <[email protected]> Authored: Sat May 21 15:33:32 2016 -0400 Committer: Julian Hyde <[email protected]> Committed: Thu May 26 14:57:06 2016 -0700 ---------------------------------------------------------------------- elasticsearch/pom.xml | 142 ++++++++++ .../elasticsearch/ElasticsearchEnumerator.java | 151 ++++++++++ .../elasticsearch/ElasticsearchFilter.java | 284 +++++++++++++++++++ .../elasticsearch/ElasticsearchMethod.java | 50 ++++ .../elasticsearch/ElasticsearchProject.java | 95 +++++++ .../adapter/elasticsearch/ElasticsearchRel.java | 58 ++++ .../elasticsearch/ElasticsearchRules.java | 240 ++++++++++++++++ .../elasticsearch/ElasticsearchSchema.java | 125 ++++++++ .../ElasticsearchSchemaFactory.java | 63 ++++ .../elasticsearch/ElasticsearchSort.java | 93 ++++++ .../elasticsearch/ElasticsearchTable.java | 150 ++++++++++ .../elasticsearch/ElasticsearchTableScan.java | 88 ++++++ .../ElasticsearchToEnumerableConverter.java | 124 ++++++++ .../ElasticsearchToEnumerableConverterRule.java | 42 +++ .../adapter/elasticsearch/package-info.java | 26 ++ .../calcite/test/ElasticsearchAdapterIT.java | 270 ++++++++++++++++++ .../resources/elasticsearch-zips-model.json | 50 ++++ .../src/test/resources/log4j.properties | 24 ++ pom.xml | 2 + site/_docs/adapter.md | 1 + site/_docs/elasticsearch_adapter.md | 136 +++++++++ sqlline | 2 +- sqlline.bat | 2 +- 23 files changed, 2216 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml new file mode 100644 index 0000000..fc6df83 --- /dev/null +++ b/elasticsearch/pom.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite</artifactId> + <version>1.8.0-SNAPSHOT</version> + </parent> + + <artifactId>calcite-elasticsearch</artifactId> + <packaging>jar</packaging> + <version>1.8.0-SNAPSHOT</version> + <name>Calcite Elasticsearch</name> + <description>Elasticsearch adapter for Calcite</description> + + <properties> + <top.dir>${project.basedir}/..</top.dir> + </properties> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.calcite.avatica</groupId> + <artifactId>avatica</artifactId> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <type>jar</type> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch-java-driver.version}</version> + </dependency> + <dependency> + <groupId>com.carrotsearch</groupId> + <artifactId>hppc</artifactId> + <version>0.7.1</version> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>analyze</id> + <goals> + <goal>analyze-only</goal> + </goals> + <configuration> + <failOnWarning>true</failOnWarning> + <!-- ignore "unused but declared" warnings --> + <ignoredUnusedDeclaredDependencies> + <ignoredUnusedDeclaredDependency>org.apache.calcite.avatica:avatica</ignoredUnusedDeclaredDependency> + <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-api</ignoredUnusedDeclaredDependency> + <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-log4j12</ignoredUnusedDeclaredDependency> + </ignoredUnusedDeclaredDependencies> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-release-plugin</artifactId> + </plugin> + <!-- Parent module has the same plugin and does the work of + generating -sources.jar for each project. But without the + plugin declared here, IDEs don't know the sources are + available. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-sources</id> + <phase>verify</phase> + <goals> + <goal>jar-no-fork</goal> + <goal>test-jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java new file mode 100644 index 0000000..e7478f5 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.avatica.util.DateTimeUtils; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.linq4j.tree.Primitive; + +import org.elasticsearch.search.SearchHit; + +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Enumerator that reads from an Elasticsearch type. + */ +public class ElasticsearchEnumerator implements Enumerator<Object> { + private final Iterator<SearchHit> cursor; + private final Function1<SearchHit, Object> getter; + private Object current; + + /** + * Creates an ElasticsearchEnumerator. + * + * @param cursor Iterator over Elasticsearch {@link SearchHit} objects + * @param getter Converts an object into a list of fields + */ + public ElasticsearchEnumerator(Iterator<SearchHit> cursor, Function1<SearchHit, Object> getter) { + this.cursor = cursor; + this.getter = getter; + } + + public Object current() { + return current; + } + + public boolean moveNext() { + if (cursor.hasNext()) { + SearchHit map = cursor.next(); + current = getter.apply(map); + return true; + } else { + current = null; + return false; + } + } + + public void reset() { + throw new UnsupportedOperationException(); + } + + public void close() { + // nothing to do + } + + private static Function1<SearchHit, Map> mapGetter() { + return new Function1<SearchHit, Map>() { + public Map apply(SearchHit searchHitFields) { + return (Map) searchHitFields.fields(); + } + }; + } + + private static Function1<SearchHit, Object> singletonGetter(final String fieldName, + final Class fieldClass) { + return new Function1<SearchHit, Object>() { + public Object apply(SearchHit searchHitFields) { + if (searchHitFields.fields().isEmpty()) { + return convert(searchHitFields.getSource(), fieldClass); + } else { + return convert(searchHitFields.getFields(), fieldClass); + } + } + }; + } + + /** + * Function that extracts a given set of fields from {@link SearchHit} + * objects. + * + * @param fields List of fields to project + */ + private static Function1<SearchHit, Object[]> listGetter( + final List<Map.Entry<String, Class>> fields) { + return new Function1<SearchHit, Object[]>() { + public Object[] apply(SearchHit searchHitFields) { + Object[] objects = new Object[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Map.Entry<String, Class> field = fields.get(i); + final String name = field.getKey(); + if (searchHitFields.fields().isEmpty()) { + objects[i] = convert(searchHitFields.getSource().get(name), field.getValue()); + } else { + objects[i] = convert(searchHitFields.field(name).getValue(), field.getValue()); + } + } + return objects; + } + }; + } + + static Function1<SearchHit, Object> getter(List<Map.Entry<String, Class>> fields) { + //noinspection unchecked + return fields == null + ? (Function1) mapGetter() + : fields.size() == 1 + ? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue()) + : (Function1) listGetter(fields); + } + + private static Object convert(Object o, Class clazz) { + if (o == null) { + return null; + } + Primitive primitive = Primitive.of(clazz); + if (primitive != null) { + clazz = primitive.boxClass; + } else { + primitive = Primitive.ofBox(clazz); + } + if (clazz.isInstance(o)) { + return o; + } + if (o instanceof Date && primitive != null) { + o = ((Date) o).getTime() / DateTimeUtils.MILLIS_PER_DAY; + } + if (o instanceof Number && primitive != null) { + return primitive.number((Number) o); + } + return o; + } +} + +// End ElasticsearchEnumerator.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java new file mode 100644 index 0000000..f11a7b5 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.JsonBuilder; +import org.apache.calcite.util.Pair; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + + +/** + * Implementation of a {@link org.apache.calcite.rel.core.Filter} + * relational expression in Elasticsearch. + */ +public class ElasticsearchFilter extends Filter implements ElasticsearchRel { + public ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, + RexNode condition) { + super(cluster, traitSet, child, condition); + assert getConvention() == ElasticsearchRel.CONVENTION; + assert getConvention() == child.getConvention(); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(0.1); + } + + @Override public Filter copy(RelTraitSet relTraitSet, RelNode input, RexNode condition) { + return new ElasticsearchFilter(getCluster(), relTraitSet, input, condition); + } + + @Override public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + Translator translator = new Translator(ElasticsearchRules + .elasticsearchFieldNames(getRowType())); + String match = translator.translateMatch(condition); + implementor.add(match); + } + + /** + * Translates {@link RexNode} expressions into Elasticsearch expression strings. + */ + static class Translator { + final JsonBuilder builder = new JsonBuilder(); + final Multimap<String, Pair<String, RexLiteral>> multimap = + HashMultimap.create(); + final Map<String, RexLiteral> eqMap = new LinkedHashMap<>(); + private final List<String> fieldNames; + + Translator(List<String> fieldNames) { + this.fieldNames = fieldNames; + } + + private String translateMatch(RexNode condition) { + // filter node + final Map<String, Object> filterMap = new LinkedHashMap<>(); + filterMap.put("filter", translateOr(condition)); + + // constant_score node + final Map<String, Object> map = builder.map(); + map.put("constant_score", filterMap); + + return "\"query\" : " + builder.toJsonString(map).replaceAll("\\s+", "").toLowerCase(); + } + + private Object translateOr(RexNode condition) { + final List<Object> list = new ArrayList<>(); + + final List<RexNode> orNodes = RelOptUtil.disjunctions(condition); + for (RexNode node : orNodes) { + List<Map<String, Object>> andNodes = translateAnd(node); + + if (andNodes.size() > 0) { + Map<String, Object> andClause = new HashMap<>(); + andClause.put("must", andNodes); + + // boolean filters + LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>(); + filterEvaluator.put("bool", andClause); + list.add(filterEvaluator); + } else { + list.add(andNodes.get(0)); + } + } + + if (orNodes.size() > 1) { + Map<String, Object> map = builder.map(); + map.put("should", list); + + // boolean filters + LinkedHashMap<String, Object> filterEvaluator = new LinkedHashMap<>(); + filterEvaluator.put("bool", map); + return filterEvaluator; + } else { + return list.get(0); + } + } + + private void addPredicate(Map<String, Object> map, String op, Object v) { + if (map.containsKey(op) && stronger(op, map.get(op), v)) { + return; + } + map.put(op, v); + } + + /** + * Translates a condition that may be an AND of other conditions. Gathers + * together conditions that apply to the same field. + */ + private List<Map<String, Object>> translateAnd(RexNode node0) { + eqMap.clear(); + multimap.clear(); + for (RexNode node : RelOptUtil.conjunctions(node0)) { + translateMatch2(node); + } + List<Map<String, Object>> filters = new ArrayList<>(); + for (Map.Entry<String, RexLiteral> entry : eqMap.entrySet()) { + multimap.removeAll(entry.getKey()); + + Map<String, Object> filter = new HashMap<>(); + filter.put(entry.getKey(), literalValue(entry.getValue())); + + Map<String, Object> map = new HashMap<>(); + map.put("term", filter); + filters.add(map); + } + for (Map.Entry<String, Collection<Pair<String, RexLiteral>>> entry + : multimap.asMap().entrySet()) { + Map<String, Object> map2 = builder.map(); + + Map<String, Object> map = new HashMap<>(); + for (Pair<String, RexLiteral> s : entry.getValue()) { + if (!s.left.equals("not")) { + addPredicate(map2, s.left, literalValue(s.right)); + + Map<String, Object> filter = new HashMap<>(); + filter.put(entry.getKey(), map2); + + map.put("range", filter); + } else { + map2.put(entry.getKey(), literalValue(s.right)); + + Map<String, Object> termMap = new HashMap<>(); + termMap.put("term", map2); + + map.put("not", termMap); + } + } + filters.add(map); + } + return filters; + } + + private boolean stronger(String key, Object v0, Object v1) { + if (key.equals("lt") || key.equals("lte")) { + if (v0 instanceof Number && v1 instanceof Number) { + return ((Number) v0).doubleValue() < ((Number) v1).doubleValue(); + } + if (v0 instanceof String && v1 instanceof String) { + return v0.toString().compareTo(v1.toString()) < 0; + } + } + if (key.equals("gt") || key.equals("gte")) { + return stronger("lt", v1, v0); + } + return false; + } + + private static Object literalValue(RexLiteral literal) { + return literal.getValue2(); + } + + private Void translateMatch2(RexNode node) { + switch (node.getKind()) { + case EQUALS: + return translateBinary(null, null, (RexCall) node); + case LESS_THAN: + return translateBinary("lt", "gt", (RexCall) node); + case LESS_THAN_OR_EQUAL: + return translateBinary("lte", "gte", (RexCall) node); + case NOT_EQUALS: + return translateBinary("not", "not", (RexCall) node); + case GREATER_THAN: + return translateBinary("gt", "lt", (RexCall) node); + case GREATER_THAN_OR_EQUAL: + return translateBinary("gte", "lte", (RexCall) node); + default: + throw new AssertionError("cannot translate " + node); + } + } + + /** + * Translates a call to a binary operator, reversing arguments if + * necessary. + */ + private Void translateBinary(String op, String rop, RexCall call) { + final RexNode left = call.operands.get(0); + final RexNode right = call.operands.get(1); + boolean b = translateBinary2(op, left, right); + if (b) { + return null; + } + b = translateBinary2(rop, right, left); + if (b) { + return null; + } + throw new AssertionError("cannot translate op " + op + " call " + call); + } + + /** + * Translates a call to a binary operator. Returns whether successful. + */ + private boolean translateBinary2(String op, RexNode left, RexNode right) { + switch (right.getKind()) { + case LITERAL: + break; + default: + return false; + } + final RexLiteral rightLiteral = (RexLiteral) right; + switch (left.getKind()) { + case INPUT_REF: + final RexInputRef left1 = (RexInputRef) left; + String name = fieldNames.get(left1.getIndex()); + translateOp2(op, name, rightLiteral); + return true; + case CAST: + return translateBinary2(op, ((RexCall) left).operands.get(0), right); + case OTHER_FUNCTION: + String itemName = ElasticsearchRules.isItem((RexCall) left); + if (itemName != null) { + translateOp2(op, itemName, rightLiteral); + return true; + } + // fall through + default: + return false; + } + } + + private void translateOp2(String op, String name, RexLiteral right) { + if (op == null) { + eqMap.put(name, right); + } else { + multimap.put(name, Pair.of(op, right)); + } + } + } +} + +// End ElasticsearchFilter.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java new file mode 100644 index 0000000..a0b3af6 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.linq4j.tree.Types; + +import com.google.common.collect.ImmutableMap; + +import java.lang.reflect.Method; +import java.util.List; + +/** + * Builtin methods in the Elasticsearch adapter. + */ +enum ElasticsearchMethod { + ELASTICSEARCH_QUERYABLE_FIND(ElasticsearchTable.ElasticsearchQueryable.class, "find", + List.class, List.class); + + public final Method method; + + public static final ImmutableMap<Method, ElasticsearchMethod> MAP; + + static { + final ImmutableMap.Builder<Method, ElasticsearchMethod> builder = ImmutableMap.builder(); + for (ElasticsearchMethod value: ElasticsearchMethod.values()) { + builder.put(value.method, value); + } + MAP = builder.build(); + } + + ElasticsearchMethod(Class clazz, String methodName, Class... argumentTypes) { + this.method = Types.lookupMethod(clazz, methodName, argumentTypes); + } +} + +// End ElasticsearchMethod.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java new file mode 100644 index 0000000..c2c09a5 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Project} + * relational expression in Elasticsearch. + */ +public class ElasticsearchProject extends Project implements ElasticsearchRel { + public ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, + List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traitSet, input, projects, rowType); + assert getConvention() == ElasticsearchRel.CONVENTION; + assert getConvention() == input.getConvention(); + } + + @Override public Project copy(RelTraitSet relTraitSet, RelNode input, List<RexNode> projects, + RelDataType relDataType) { + return new ElasticsearchProject(getCluster(), traitSet, input, projects, relDataType); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(0.1); + } + + @Override public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + + final ElasticsearchRules.RexToElasticsearchTranslator translator = + new ElasticsearchRules.RexToElasticsearchTranslator( + (JavaTypeFactory) getCluster().getTypeFactory(), + ElasticsearchRules.elasticsearchFieldNames(getInput().getRowType())); + + final List<String> findItems = new ArrayList<>(); + final List<String> scriptFieldItems = new ArrayList<>(); + for (Pair<RexNode, String> pair: getNamedProjects()) { + final String name = pair.right; + final String expr = pair.left.accept(translator); + + if (expr.equals("\"" + name + "\"")) { + findItems.add(ElasticsearchRules.quote(name)); + } else if (expr.matches("\"literal\":.+")) { + scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\": " + + expr.split(":")[1] + "}"); + } else { + scriptFieldItems.add(ElasticsearchRules.quote(name) + ":{\"script\":\"_source." + + expr.replaceAll("\"", "") + "\"}"); + } + } + final String findString = Util.toString(findItems, "", ", ", ""); + final String scriptFieldString = "\"script_fields\": {" + + Util.toString(scriptFieldItems, "", ", ", "") + "}"; + final String fieldString = "\"fields\" : [" + findString + "]" + + ", " + scriptFieldString; + + for (String opfield : implementor.list) { + if (opfield.startsWith("\"fields\"")) { + implementor.list.remove(opfield); + } + } + implementor.add(fieldString); + } +} + +// End ElasticsearchProject.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java new file mode 100644 index 0000000..e24cb0d --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; + +import java.util.ArrayList; +import java.util.List; + +/** + * Relational expression that uses Elasticsearch calling convention. + */ +public interface ElasticsearchRel extends RelNode { + void implement(Implementor implementor); + + /** + * Calling convention for relational operations that occur in Elasticsearch. + */ + Convention CONVENTION = new Convention.Impl("ELASTICSEARCH", ElasticsearchRel.class); + + /** + * Callback for the implementation process that converts a tree of + * {@link ElasticsearchRel} nodes into an Elasticsearch query. + */ + class Implementor { + final List<String> list = new ArrayList<>(); + + RelOptTable table; + ElasticsearchTable elasticsearchTable; + + public void add(String findOp) { + list.add(findOp); + } + + public void visitChild(int ordinal, RelNode input) { + assert ordinal == 0; + ((ElasticsearchRel) input).implement(this); + } + } +} + +// End ElasticsearchRel.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java new file mode 100644 index 0000000..2e68156 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.adapter.enumerable.RexImpTable; +import org.apache.calcite.adapter.enumerable.RexToLixTranslator; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.util.trace.CalciteTrace; + +import org.slf4j.Logger; + +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.List; + +/** + * Rules and relational operators for + * {@link ElasticsearchRel#CONVENTION ELASTICSEARCH} + * calling convention. + */ +class ElasticsearchRules { + protected static final Logger LOGGER = CalciteTrace.getPlannerTracer(); + + static final RelOptRule[] RULES = { + ElasticsearchSortRule.INSTANCE, + ElasticsearchFilterRule.INSTANCE, + ElasticsearchProjectRule.INSTANCE + }; + + private ElasticsearchRules() {} + + /** + * Returns 'string' if it is a call to item['string'], null otherwise. + */ + static String isItem(RexCall call) { + if (call.getOperator() != SqlStdOperatorTable.ITEM) { + return null; + } + final RexNode op0 = call.getOperands().get(0); + final RexNode op1 = call.getOperands().get(1); + + if (op0 instanceof RexInputRef + && ((RexInputRef) op0).getIndex() == 0 + && op1 instanceof RexLiteral + && ((RexLiteral) op1).getValue2() instanceof String) { + return (String) ((RexLiteral) op1).getValue2(); + } + return null; + } + + static List<String> elasticsearchFieldNames(final RelDataType rowType) { + return SqlValidatorUtil.uniquify( + new AbstractList<String>() { + @Override public String get(int index) { + final String name = rowType.getFieldList().get(index).getName(); + return name.startsWith("$") ? "_" + name.substring(2) : name; + } + + @Override public int size() { + return rowType.getFieldCount(); + } + }); + } + + static String quote(String s) { + return "\"" + s + "\""; + } + + /** + * Translator from {@link RexNode} to strings in Elasticsearch's expression + * language. + */ + static class RexToElasticsearchTranslator extends RexVisitorImpl<String> { + private final JavaTypeFactory typeFactory; + private final List<String> inFields; + + RexToElasticsearchTranslator(JavaTypeFactory typeFactory, List<String> inFields) { + super(true); + this.typeFactory = typeFactory; + this.inFields = inFields; + } + + @Override public String visitLiteral(RexLiteral literal) { + if (literal.getValue() == null) { + return "null"; + } + return "\"literal\":\"" + + RexToLixTranslator.translateLiteral(literal, literal.getType(), + typeFactory, RexImpTable.NullAs.NOT_POSSIBLE) + + "\""; + } + + @Override public String visitInputRef(RexInputRef inputRef) { + return quote(inFields.get(inputRef.getIndex())); + } + + @Override public String visitCall(RexCall call) { + final String name = isItem(call); + if (name != null) { + return "\"" + name + "\""; + } + + final List<String> strings = visitList(call.operands); + if (call.getKind() == SqlKind.CAST) { + return strings.get(0).startsWith("$") ? strings.get(0).substring(1) : strings.get(0); + } + if (call.getOperator() == SqlStdOperatorTable.ITEM) { + final RexNode op1 = call.getOperands().get(1); + if (op1 instanceof RexLiteral && op1.getType().getSqlTypeName() == SqlTypeName.INTEGER) { + return stripQuotes(strings.get(0)) + "[" + ((RexLiteral) op1).getValue2() + "]"; + } + } + throw new IllegalArgumentException("Translation of " + call.toString() + + "is not supported by ElasticsearchProject"); + } + + private String stripQuotes(String s) { + return s.startsWith("'") && s.endsWith("'") ? s.substring(1, s.length() - 1) : s; + } + + List<String> visitList(List<RexNode> list) { + final List<String> strings = new ArrayList<>(); + for (RexNode node: list) { + strings.add(node.accept(this)); + } + return strings; + } + } + + /** + * Base class for planner rules that convert a relational expression to + * Elasticsearch calling convention. + */ + abstract static class ElasticsearchConverterRule extends ConverterRule { + final Convention out; + + ElasticsearchConverterRule(Class<? extends RelNode> clazz, RelTrait in, Convention out, + String description) { + super(clazz, in, out, description); + this.out = out; + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to an + * {@link ElasticsearchSort}. + */ + private static class ElasticsearchSortRule extends ElasticsearchConverterRule { + private static final ElasticsearchSortRule INSTANCE = new ElasticsearchSortRule(); + + private ElasticsearchSortRule() { + super(Sort.class, Convention.NONE, ElasticsearchRel.CONVENTION, "ElasticsearchSortRule"); + } + + @Override public RelNode convert(RelNode relNode) { + final Sort sort = (Sort) relNode; + final RelTraitSet traitSet = sort.getTraitSet().replace(out).replace(sort.getCollation()); + return new ElasticsearchSort(relNode.getCluster(), traitSet, + convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)), sort.getCollation(), + sort.offset, sort.fetch); + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to an + * {@link ElasticsearchFilter}. + */ + private static class ElasticsearchFilterRule extends ElasticsearchConverterRule { + private static final ElasticsearchFilterRule INSTANCE = new ElasticsearchFilterRule(); + + private ElasticsearchFilterRule() { + super(LogicalFilter.class, Convention.NONE, ElasticsearchRel.CONVENTION, + "ElasticsearchFilterRule"); + } + + @Override public RelNode convert(RelNode relNode) { + final LogicalFilter filter = (LogicalFilter) relNode; + final RelTraitSet traitSet = filter.getTraitSet().replace(out); + return new ElasticsearchFilter(relNode.getCluster(), traitSet, + convert(filter.getInput(), out), + filter.getCondition()); + } + } + + /** + * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject} + * to an {@link ElasticsearchProject}. + */ + private static class ElasticsearchProjectRule extends ElasticsearchConverterRule { + private static final ElasticsearchProjectRule INSTANCE = new ElasticsearchProjectRule(); + + private ElasticsearchProjectRule() { + super(LogicalProject.class, Convention.NONE, ElasticsearchRel.CONVENTION, + "ElasticsearchProjectRule"); + } + + @Override public RelNode convert(RelNode relNode) { + final LogicalProject project = (LogicalProject) relNode; + final RelTraitSet traitSet = project.getTraitSet().replace(out); + return new ElasticsearchProject(project.getCluster(), traitSet, + convert(project.getInput(), out), project.getProjects(), project.getRowType()); + } + } +} + +// End ElasticsearchRules.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java new file mode 100644 index 0000000..e59e0a4 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Schema mapped onto an index of ELASTICSEARCH types. + * + * <p>Each table in the schema is an ELASTICSEARCH type in that index. + */ +public class ElasticsearchSchema extends AbstractSchema { + final String index; + + private transient Client client; + + /** + * Creates an Elasticsearch schema. + * + * @param coordinates Map of Elasticsearch node locations (host, port) + * @param userConfig Map of user-specified configurations + * @param indexName Elasticsearch database name, e.g. "usa". + */ + ElasticsearchSchema(Map<String, Integer> coordinates, + Map<String, String> userConfig, String indexName) { + super(); + + final List<InetSocketAddress> transportAddresses = new ArrayList<>(); + for (Map.Entry<String, Integer> coordinate: coordinates.entrySet()) { + transportAddresses.add(new InetSocketAddress(coordinate.getKey(), coordinate.getValue())); + } + + open(transportAddresses, userConfig); + + if (client != null) { + final String[] indices = client.admin().indices() + .getIndex(new GetIndexRequest().indices(indexName)) + .actionGet().getIndices(); + if (indices.length == 1) { + index = indices[0]; + } else { + index = null; + } + } else { + index = null; + } + } + + @Override protected Map<String, Table> getTableMap() { + final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); + + try { + GetMappingsResponse response = client.admin().indices().getMappings( + new GetMappingsRequest().indices(index)).get(); + ImmutableOpenMap<String, MappingMetaData> mapping = response.getMappings().get(index); + for (ObjectObjectCursor<String, MappingMetaData> c: mapping) { + builder.put(c.key, new ElasticsearchTable(client, index, c.key)); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + return builder.build(); + } + + private void open(List<InetSocketAddress> transportAddresses, Map<String, String> userConfig) { + final List<TransportAddress> transportNodes = new ArrayList<>(transportAddresses.size()); + for (InetSocketAddress address : transportAddresses) { + transportNodes.add(new InetSocketTransportAddress(address)); + } + + Settings settings = Settings.settingsBuilder().put(userConfig).build(); + + final TransportClient transportClient = TransportClient.builder().settings(settings).build(); + for (TransportAddress transport : transportNodes) { + transportClient.addTransportAddress(transport); + } + + final List<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes()); + if (nodes.isEmpty()) { + throw new RuntimeException("Cannot connect to any elasticsearch nodes"); + } + + client = transportClient; + } +} + +// End ElasticsearchSchema.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java new file mode 100644 index 0000000..41ffc10 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.Map; + +/** + * Factory that creates a {@link ElasticsearchSchema}. + * + * <p>Allows a custom schema to be included in a model.json file. + */ +@SuppressWarnings("UnusedDeclaration") +public class ElasticsearchSchemaFactory implements SchemaFactory { + + public ElasticsearchSchemaFactory() { + } + + @Override public Schema create(SchemaPlus parentSchema, String name, + Map<String, Object> operand) { + final Map map = (Map) operand; + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); + + try { + final Map<String, Integer> coordinates = + mapper.readValue((String) map.get("coordinates"), + new TypeReference<Map<String, Integer>>() { }); + final Map<String, String> userConfig = + mapper.readValue((String) map.get("userConfig"), + new TypeReference<Map<String, String>>() { }); + final String index = (String) map.get("index"); + return new ElasticsearchSchema(coordinates, userConfig, index); + } catch (IOException e) { + throw new RuntimeException("Cannot parse values from json", e); + } + } +} + +// End ElasticsearchSchemaFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java new file mode 100644 index 0000000..5f5dfe8 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; + +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Sort} + * relational expression in Elasticsearch. + */ +public class ElasticsearchSort extends Sort implements ElasticsearchRel { + public ElasticsearchSort(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, + RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traitSet, child, collation, offset, fetch); + assert getConvention() == ElasticsearchRel.CONVENTION; + assert getConvention() == child.getConvention(); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(0.05); + } + + @Override public Sort copy(RelTraitSet traitSet, RelNode relNode, RelCollation relCollation, + RexNode offset, RexNode fetch) { + return new ElasticsearchSort(getCluster(), traitSet, relNode, collation, offset, fetch); + } + + @Override public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + if (!collation.getFieldCollations().isEmpty()) { + final List<String> keys = new ArrayList<>(); + final List<RelDataTypeField> fields = getRowType().getFieldList(); + + for (RelFieldCollation fieldCollation: collation.getFieldCollations()) { + final String name = fields.get(fieldCollation.getFieldIndex()).getName(); + keys.add(ElasticsearchRules.quote(name) + ": " + direction(fieldCollation)); + } + + implementor.add("\"sort\": [ " + Util.toString(keys, "{", "}, {", "}") + "]"); + } + + if (offset != null) { + implementor.add("\"from\": " + ((RexLiteral) offset).getValue()); + } + + if (fetch != null) { + implementor.add("\"size\": " + ((RexLiteral) fetch).getValue()); + } + } + + private String direction(RelFieldCollation fieldCollation) { + switch (fieldCollation.getDirection()) { + case DESCENDING: + case STRICTLY_DESCENDING: + return "\"desc\""; + case ASCENDING: + case STRICTLY_ASCENDING: + default: + return "\"asc\""; + } + } +} + +// End ElasticsearchSort.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java new file mode 100644 index 0000000..f3dbca5 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.adapter.java.AbstractQueryableTable; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTableQueryable; +import org.apache.calcite.sql.type.SqlTypeName; + +import org.apache.calcite.util.Util; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Table based on an Elasticsearch type. + */ +public class ElasticsearchTable extends AbstractQueryableTable implements TranslatableTable { + private final Client client; + private final String indexName; + private final String typeName; + + /** + * Creates an ElasticsearchTable. + */ + public ElasticsearchTable(Client client, String indexName, + String typeName) { + super(Object[].class); + this.client = client; + this.indexName = indexName; + this.typeName = typeName; + } + + @Override public String toString() { + return "ElasticsearchTable{" + typeName + "}"; + } + + public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { + final RelDataType mapType = relDataTypeFactory.createMapType( + relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR), + relDataTypeFactory.createTypeWithNullability( + relDataTypeFactory.createSqlType(SqlTypeName.ANY), + true)); + return relDataTypeFactory.builder().add("_MAP", mapType).build(); + } + + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + return new ElasticsearchQueryable<>(queryProvider, schema, this, tableName); + } + + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new ElasticsearchTableScan(cluster, cluster.traitSetOf(ElasticsearchRel.CONVENTION), + relOptTable, this, null); + } + + /** Executes a "find" operation on the underlying type. + * + * <p>For example, + * <code>client.prepareSearch(index).setTypes(type) + * .setSource("{\"fields\" : [\"state\"]}")</code></p> + * + * @param index Elasticsearch index + * @param ops List of operations represented as Json strings. + * @param fields List of fields to project; or null to return map + * @return Enumerator of results + */ + private Enumerable<Object> find(String index, List<String> ops, + List<Map.Entry<String, Class>> fields) { + final String dbName = index; + + final String queryString = "{" + Util.toString(ops, "", ", ", "") + "}"; + + final Function1<SearchHit, Object> getter = ElasticsearchEnumerator.getter(fields); + + return new AbstractEnumerable<Object>() { + public Enumerator<Object> enumerator() { + final Iterator<SearchHit> cursor = client.prepareSearch(dbName).setTypes(typeName) + .setSource(queryString).execute().actionGet().getHits().iterator(); + return new ElasticsearchEnumerator(cursor, getter); + } + }; + } + + /** + * Implementation of {@link org.apache.calcite.linq4j.Queryable} based on + * a {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchTable}. + */ + public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> { + public ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema, + ElasticsearchTable table, String tableName) { + super(queryProvider, schema, table, tableName); + } + + public Enumerator<T> enumerator() { + return null; + } + + private String getIndex() { + return schema.unwrap(ElasticsearchSchema.class).index; + } + + private ElasticsearchTable getTable() { + return (ElasticsearchTable) table; + } + + /** Called via code-generation. + * + * @see org.apache.calcite.adapter.elasticsearch.ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND + */ + @SuppressWarnings("UnusedDeclaration") + public Enumerable<Object> find(List<String> ops, + List<Map.Entry<String, Class>> fields) { + return getTable().find(getIndex(), ops, fields); + } + } +} + +// End ElasticsearchTable.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java new file mode 100644 index 0000000..636a629 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTableScan.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.List; + +/** + * Relational expression representing a scan of an Elasticsearch type. + * + * <p> Additional operations might be applied, + * using the "find" method.</p> + */ +public class ElasticsearchTableScan extends TableScan implements ElasticsearchRel { + private final ElasticsearchTable elasticsearchTable; + private final RelDataType projectRowType; + + /** + * Creates an ElasticsearchTableScan. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param table Table + * @param elasticsearchTable Elasticsearch table + * @param projectRowType Fields and types to project; null to project raw row + */ + protected ElasticsearchTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table, + ElasticsearchTable elasticsearchTable, RelDataType projectRowType) { + super(cluster, traitSet, table); + this.elasticsearchTable = elasticsearchTable; + this.projectRowType = projectRowType; + + assert elasticsearchTable != null; + assert getConvention() == ElasticsearchRel.CONVENTION; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override public RelDataType deriveRowType() { + return projectRowType != null ? projectRowType : super.deriveRowType(); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + final float f = projectRowType == null ? 1f : (float) projectRowType.getFieldCount() / 100f; + return super.computeSelfCost(planner, mq).multiplyBy(.1 * f); + } + + @Override public void register(RelOptPlanner planner) { + planner.addRule(ElasticsearchToEnumerableConverterRule.INSTANCE); + for (RelOptRule rule: ElasticsearchRules.RULES) { + planner.addRule(rule); + } + } + + @Override public void implement(Implementor implementor) { + implementor.elasticsearchTable = elasticsearchTable; + implementor.table = table; + } +} + +// End ElasticsearchTableScan.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java new file mode 100644 index 0000000..adb88f7 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; + +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MethodCallExpression; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.CalcitePrepareImpl; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Pair; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import java.util.AbstractList; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Relational expression representing a scan of a table in an Elasticsearch data source. + */ +public class ElasticsearchToEnumerableConverter extends ConverterImpl implements EnumerableRel { + protected ElasticsearchToEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, + RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new ElasticsearchToEnumerableConverter(getCluster(), traitSet, sole(inputs)); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(.1); + } + + @Override public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { + final BlockBuilder list = new BlockBuilder(); + final ElasticsearchRel.Implementor elasticsearchImplementor = + new ElasticsearchRel.Implementor(); + elasticsearchImplementor.visitChild(0, getInput()); + final RelDataType rowType = getRowType(); + final PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, + prefer.prefer(JavaRowFormat.ARRAY)); + final Expression fields = list.append("fields", + constantArrayList( + Pair.zip(ElasticsearchRules.elasticsearchFieldNames(rowType), + new AbstractList<Class>() { + @Override public Class get(int index) { + return physType.fieldClass(index); + } + + @Override public int size() { + return rowType.getFieldCount(); + } + }), + Pair.class)); + final Expression table = list.append("table", + elasticsearchImplementor.table + .getExpression(ElasticsearchTable.ElasticsearchQueryable.class)); + List<String> opList = elasticsearchImplementor.list; + final Expression ops = list.append("ops", constantArrayList(opList, String.class)); + Expression enumerable = list.append("enumerable", + Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops, + fields)); + if (CalcitePrepareImpl.DEBUG) { + System.out.println("Elasticsearch: " + opList); + } + Hook.QUERY_PLAN.run(opList); + list.add(Expressions.return_(null, enumerable)); + return implementor.result(physType, list.toBlock()); + } + + /** E.g. {@code constantArrayList("x", "y")} returns + * "Arrays.asList('x', 'y')". */ + private static <T> MethodCallExpression constantArrayList(List<T> values, Class clazz) { + return Expressions.call(BuiltInMethod.ARRAYS_AS_LIST.method, + Expressions.newArrayInit(clazz, constantList(values))); + } + + /** E.g. {@code constantList("x", "y")} returns + * {@code {ConstantExpression("x"), ConstantExpression("y")}}. */ + private static <T> List<Expression> constantList(List<T> values) { + return Lists.transform(values, + new Function<T, Expression>() { + @Nullable + @Override public Expression apply(@Nullable T t) { + return Expressions.constant(t); + } + }); + } +} + +// End ElasticsearchToEnumerableConverter.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java new file mode 100644 index 0000000..1047757 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverterRule.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; + +/** + * Rule to convert a relational expression from + * {@link ElasticsearchRel#CONVENTION} to {@link EnumerableConvention}. + */ +public class ElasticsearchToEnumerableConverterRule extends ConverterRule { + public static final ConverterRule INSTANCE = new ElasticsearchToEnumerableConverterRule(); + + private ElasticsearchToEnumerableConverterRule() { + super(RelNode.class, ElasticsearchRel.CONVENTION, EnumerableConvention.INSTANCE, + "ElasticsearchToEnumerableConverterRule"); + } + + @Override public RelNode convert(RelNode relNode) { + RelTraitSet newTraitSet = relNode.getTraitSet().replace(getOutConvention()); + return new ElasticsearchToEnumerableConverter(relNode.getCluster(), newTraitSet, relNode); + } +} + +// End ElasticsearchToEnumerableConverterRule.java http://git-wip-us.apache.org/repos/asf/calcite/blob/f3caf13b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java new file mode 100644 index 0000000..dad800a --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Query provider based on an Elasticsearch DB. + */ +@PackageMarker +package org.apache.calcite.adapter.elasticsearch; + +import org.apache.calcite.avatica.util.PackageMarker; + +// End package-info.java
