Repository: samza Updated Branches: refs/heads/samza-sql 6a40d5a9a -> 78d2fedb6
SAMZA-483; begin samza calcite integration Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78d2fedb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78d2fedb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78d2fedb Branch: refs/heads/samza-sql Commit: 78d2fedb6136c9b923876e41329d96405f934846 Parents: 6a40d5a Author: Milinda Lakmal Pathirage <milinda.pathir...@gmail.com> Authored: Fri Feb 13 13:25:08 2015 -0800 Committer: Chris Riccomini <criccom...@apache.org> Committed: Fri Feb 13 13:25:08 2015 -0800 ---------------------------------------------------------------------- build.gradle | 8 + gradle/dependency-versions.gradle | 1 + .../apache/samza/sql/planner/QueryPlanner.java | 216 +++++++++++ .../sql/planner/SamzaCalciteConnection.java | 373 +++++++++++++++++++ .../planner/SamzaQueryPreparingStatement.java | 111 ++++++ .../samza/sql/planner/SamzaSqlValidator.java | 54 +++ .../samza/sql/planner/QueryPlannerTest.java | 92 +++++ .../sql/planner/SamzaStreamTableFactory.java | 94 +++++ 8 files changed, 949 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index b49c313..bbd94e1 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,12 @@ allprojects { repositories { mavenCentral() mavenLocal() + maven { + url "https://repository.apache.org/content/repositories/snapshots/" + } + maven { + url "http://conjars.org/repo" + } } } @@ -69,6 +75,7 @@ rat { '**/non-responsive.less', '**/ropa-sans.css', '**/syntax.css', + '.idea/**', '.reviewboardrc', 'docs/_site/**', 'docs/sitemap.xml', @@ -255,6 +262,7 @@ project(":samza-sql_$scalaVersion") { compile project(":samza-kv_$scalaVersion") compile "commons-collections:commons-collections:$commonsCollectionVersion" compile "org.apache.avro:avro:$avroVersion" + compile "org.apache.calcite:calcite-core:$calciteVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 03c72f8..d1bb40f 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -36,4 +36,5 @@ commonsCodecVersion = "1.9" commonsCollectionVersion = "3.2.1" avroVersion = "1.7.7" + calciteVersion = "1.1-chi-incubating-SNAPSHOT" } http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java new file mode 100644 index 0000000..16dbdc3 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRules; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.plan.*; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.rules.*; +import org.apache.calcite.rel.stream.StreamRules; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.sql.validate.SqlValidator; + +import java.util.List; + +/** + * Streaming query planner implementation based on Calcite. + */ +public class QueryPlanner { + public static final boolean COMMUTE = + "true".equals( + System.getProperties().getProperty("calcite.enable.join.commute")); + + /** + * Whether to enable the collation trait. Some extra optimizations are + * possible if enabled, but queries should work either way. At some point + * this will become a preference, or we will run multiple phases: first + * disabled, then enabled. + */ + private static final boolean ENABLE_COLLATION_TRAIT = true; + + private static final List<RelOptRule> DEFAULT_RULES = + ImmutableList.of( + AggregateStarTableRule.INSTANCE, + AggregateStarTableRule.INSTANCE2, + TableScanRule.INSTANCE, + COMMUTE + ? JoinAssociateRule.INSTANCE + : ProjectMergeRule.INSTANCE, + FilterTableRule.INSTANCE, + ProjectTableRule.INSTANCE, + ProjectTableRule.INSTANCE2, + ProjectFilterTransposeRule.INSTANCE, + FilterProjectTransposeRule.INSTANCE, + FilterJoinRule.FILTER_ON_JOIN, + AggregateExpandDistinctAggregatesRule.INSTANCE, + AggregateReduceFunctionsRule.INSTANCE, + FilterAggregateTransposeRule.INSTANCE, + JoinCommuteRule.INSTANCE, + JoinPushThroughJoinRule.RIGHT, + JoinPushThroughJoinRule.LEFT, + SortProjectTransposeRule.INSTANCE); + + private static final List<RelOptRule> ENUMERABLE_RULES = + ImmutableList.of( + EnumerableRules.ENUMERABLE_JOIN_RULE, + EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE, + EnumerableRules.ENUMERABLE_CORRELATE_RULE, + EnumerableRules.ENUMERABLE_PROJECT_RULE, + EnumerableRules.ENUMERABLE_FILTER_RULE, + EnumerableRules.ENUMERABLE_AGGREGATE_RULE, + EnumerableRules.ENUMERABLE_SORT_RULE, + EnumerableRules.ENUMERABLE_LIMIT_RULE, + EnumerableRules.ENUMERABLE_COLLECT_RULE, + EnumerableRules.ENUMERABLE_UNCOLLECT_RULE, + EnumerableRules.ENUMERABLE_UNION_RULE, + EnumerableRules.ENUMERABLE_INTERSECT_RULE, + EnumerableRules.ENUMERABLE_MINUS_RULE, + EnumerableRules.ENUMERABLE_TABLE_MODIFICATION_RULE, + EnumerableRules.ENUMERABLE_VALUES_RULE, + EnumerableRules.ENUMERABLE_WINDOW_RULE, + EnumerableRules.ENUMERABLE_TABLE_FUNCTION_SCAN_RULE); + + private static final List<RelOptRule> CONSTANT_REDUCTION_RULES = + ImmutableList.of( + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.JOIN_INSTANCE, + ValuesReduceRule.FILTER_INSTANCE, + ValuesReduceRule.PROJECT_FILTER_INSTANCE, + ValuesReduceRule.PROJECT_INSTANCE); + + /** + * Transform streaming query to a query plan. + * @param query streaming query in SQL with streaming extensions + * @param context query prepare context + * @return query plan + */ + public RelNode getPlan(String query, CalcitePrepare.Context context) { + final JavaTypeFactory typeFactory = context.getTypeFactory(); + final CalciteConnectionConfig config = context.config(); + + CalciteCatalogReader catalogReader = new CalciteCatalogReader(context.getRootSchema(), + false, + context.getDefaultSchemaPath(), + typeFactory); + + SqlParser sqlParser = SqlParser.create(query, + SqlParser.configBuilder() + .setQuotedCasing(config.quotedCasing()) + .setUnquotedCasing(config.unquotedCasing()) + .setQuoting(config.quoting()) + .build()); + + SqlNode sqlNode; + + try { + sqlNode = sqlParser.parseStmt(); + } catch (SqlParseException e) { + throw new RuntimeException("parse failed: " + e.getMessage(), e); + } + + final ChainedSqlOperatorTable operatorTable = + new ChainedSqlOperatorTable( + ImmutableList.of(SqlStdOperatorTable.instance(), catalogReader)); + + final SqlValidator validator = + new SamzaSqlValidator(operatorTable, catalogReader, typeFactory); + validator.setIdentifierExpansion(true); + + SqlNode validatedSqlNode = validator.validate(sqlNode); + + final RelOptPlanner planner = createStreamingRelOptPlanner(context, null, null); + + final SamzaQueryPreparingStatement preparingStmt = + new SamzaQueryPreparingStatement( + context, + catalogReader, + typeFactory, + context.getRootSchema(), + EnumerableRel.Prefer.ARRAY, + planner, + EnumerableConvention.INSTANCE); + + /* TODO: Add query optimization. */ + + return preparingStmt.getSqlToRelConverter(validator, catalogReader).convertQuery(validatedSqlNode, false, true); + } + + /** + * Creates a query planner and initializes it with a default set of + * rules. + * + * @param prepareContext context for preparing a statement + * @param externalContext external query planning context + * @param costFactory cost factory for cost based query planning + * @return relation query planner instance + */ + protected RelOptPlanner createStreamingRelOptPlanner(final CalcitePrepare.Context prepareContext, + org.apache.calcite.plan.Context externalContext, + RelOptCostFactory costFactory) { + if (externalContext == null) { + externalContext = Contexts.withConfig(prepareContext.config()); + } + + final VolcanoPlanner planner = + new VolcanoPlanner(costFactory, externalContext); + + planner.addRelTraitDef(ConventionTraitDef.INSTANCE); + + if (ENABLE_COLLATION_TRAIT) { + planner.addRelTraitDef(RelCollationTraitDef.INSTANCE); + planner.registerAbstractRelationalRules(); + } + RelOptUtil.registerAbstractRels(planner); + for (RelOptRule rule : DEFAULT_RULES) { + planner.addRule(rule); + } + + /* Note: Bindable rules were removed until Calcite switches the convention of the root node to bindable. */ + + for (RelOptRule rule : ENUMERABLE_RULES) { + planner.addRule(rule); + } + + for (RelOptRule rule : StreamRules.RULES) { + planner.addRule(rule); + } + + /* Note: Constant reduction rules were removed because current Calcite implementation doesn't use them. */ + + return planner; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java new file mode 100644 index 0000000..63b1da5 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaCalciteConnection.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteRootSchema; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.model.ModelHandler; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.schema.SchemaPlus; + +import java.io.IOException; +import java.lang.reflect.Type; +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * Minimal <code>org.apache.calcite.jdbc.CalciteConnection</code> implementation which enables + * re-use of Calcite code. + */ +public class SamzaCalciteConnection implements CalciteConnection { + private static final String INLINE = "inline:"; + private final JavaTypeFactory typeFactory; + private final CalciteRootSchema rootSchema; + private String schema; + + public SamzaCalciteConnection(String model) throws IOException { + typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + rootSchema = CalciteSchema.createRootSchema(true); + new ModelHandler(this, INLINE + model); + } + + public CalciteRootSchema getCalciteRootSchema(){ + return rootSchema; + } + + @Override + public SchemaPlus getRootSchema() { + return rootSchema.plus(); + } + + @Override + public JavaTypeFactory getTypeFactory() { + return typeFactory; + } + + @Override + public Properties getProperties() { + return null; + } + + @Override + public Statement createStatement() throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void commit() throws SQLException { + + } + + @Override + public void rollback() throws SQLException { + + } + + @Override + public void close() throws SQLException { + + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public void setCatalog(String catalog) throws SQLException { + + } + + @Override + public String getCatalog() throws SQLException { + return null; + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + + } + + @Override + public int getTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + + } + + @Override + public void setHoldability(int holdability) throws SQLException { + + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return null; + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + @Override + public Clob createClob() throws SQLException { + return null; + } + + @Override + public Blob createBlob() throws SQLException { + return null; + } + + @Override + public NClob createNClob() throws SQLException { + return null; + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return null; + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return false; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return null; + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + @Override + public void setSchema(String schema) throws SQLException { + this.schema = schema; + } + + @Override + public String getSchema() throws SQLException { + return schema; + } + + public void abort(Executor executor) throws SQLException { + + } + + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public CalciteConnectionConfig config() { + return new CalciteConnectionConfigImpl(new Properties()); + } + + @Override + public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) { + return null; + } + + @Override + public <T> Queryable<T> createQuery(Expression expression, Type rowType) { + return null; + } + + @Override + public <T> T execute(Expression expression, Class<T> type) { + return null; + } + + @Override + public <T> T execute(Expression expression, Type type) { + return null; + } + + @Override + public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { + return null; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java new file mode 100644 index 0000000..0721573 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaQueryPreparingStatement.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import com.google.common.collect.Maps; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.sql2rel.StandardConvertletTable; + +import java.util.List; +import java.util.Map; + +public class SamzaQueryPreparingStatement extends Prepare implements RelOptTable.ViewExpander { + private final RelOptPlanner planner; + private final RexBuilder rexBuilder; + protected final CalciteSchema schema; + protected final RelDataTypeFactory typeFactory; + private final EnumerableRel.Prefer prefer; + private final Map<String, Object> internalParameters = + Maps.newLinkedHashMap(); + private int expansionDepth; + private SqlValidator sqlValidator; + + public SamzaQueryPreparingStatement(CalcitePrepare.Context context, CatalogReader catalogReader, + RelDataTypeFactory typeFactory, + CalciteSchema schema, + EnumerableRel.Prefer prefer, + RelOptPlanner planner, + Convention resultConvention) { + super(context, catalogReader, resultConvention); + this.schema = schema; + this.prefer = prefer; + this.planner = planner; + this.typeFactory = typeFactory; + this.rexBuilder = new RexBuilder(typeFactory); + } + + @Override + protected PreparedResult createPreparedExplanation(RelDataType resultType, RelDataType parameterRowType, RelNode rootRel, boolean explainAsXml, SqlExplainLevel detailLevel) { + return null; + } + + @Override + protected PreparedResult implement(RelDataType rowType, RelNode rootRel, SqlKind sqlKind) { + return null; + } + + @Override + protected SqlToRelConverter getSqlToRelConverter(SqlValidator validator, CatalogReader catalogReader) { + SqlToRelConverter sqlToRelConverter = + new SqlToRelConverter( + this, validator, catalogReader, planner, rexBuilder, + StandardConvertletTable.INSTANCE); + sqlToRelConverter.setTrimUnusedFields(true); + return sqlToRelConverter; + } + + @Override + public RelNode flattenTypes(RelNode rootRel, boolean restructure) { + return null; + } + + @Override + protected RelNode decorrelate(SqlToRelConverter sqlToRelConverter, SqlNode query, RelNode rootRel) { + return null; + } + + @Override + protected void init(Class runtimeContextClass) {} + + @Override + protected SqlValidator getSqlValidator() { + return null; + } + + @Override + public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) { + // TODO: Implement custom view expansions + return super.expandView(rowType, queryString, schemaPath); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java new file mode 100644 index 0000000..f46c1f0 --- /dev/null +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +public class SamzaSqlValidator extends SqlValidatorImpl{ + /** + * Creates a validator. + * + * @param opTab Operator table + * @param catalogReader Catalog reader + * @param typeFactory Type factory + */ + protected SamzaSqlValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) { + /* Note: We may need to define Samza specific SqlConformance instance in future. */ + super(opTab, catalogReader, typeFactory, SqlConformance.DEFAULT); + } + + @Override + protected RelDataType getLogicalSourceRowType( + RelDataType sourceRowType, SqlInsert insert) { + return ((JavaTypeFactory) typeFactory).toSql(sourceRowType); + } + + @Override + protected RelDataType getLogicalTargetRowType( + RelDataType targetRowType, SqlInsert insert) { + return ((JavaTypeFactory) typeFactory).toSql(targetRowType); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java new file mode 100644 index 0000000..36b6f03 --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/QueryPlannerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.util.Util; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class QueryPlannerTest { + public static final String STREAM_SCHEMA = " {\n" + + " name: 'STREAMS',\n" + + " tables: [ {\n" + + " type: 'custom',\n" + + " name: 'ORDERS',\n" + + " stream: true,\n" + + " factory: '" + SamzaStreamTableFactory.class.getName() + "'\n" + + " } ]\n" + + " }\n"; + + public static final String STREAM_MODEL = "{\n" + + " version: '1.0',\n" + + " defaultSchema: 'STREAMS',\n" + + " schemas: [\n" + + STREAM_SCHEMA + + " ]\n" + + "}"; + + public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED = + "LogicalDelta\n" + + " LogicalProject(id=[$0], product=[$1], quantity=[$2])\n" + + " LogicalFilter(condition=[>($2, 5)])\n" + + " EnumerableTableScan(table=[[STREAMS, ORDERS]])"; + public static final String SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE = + "select stream * from orders where quantity > 5"; + + @Test + public void testQueryPlanner() throws IOException, SQLException { + + SamzaCalciteConnection connection = new SamzaCalciteConnection(STREAM_MODEL); + CalcitePrepare.Context context = Schemas.makeContext(connection, + connection.getCalciteRootSchema(), + ImmutableList.of(connection.getSchema()), + ImmutableMap.copyOf(defaultConfiguration())); + + QueryPlanner planner = new QueryPlanner(); + RelNode relNode = planner.getPlan(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE, context); + Assert.assertNotNull(relNode); + String s = Util.toLinux(RelOptUtil.toString(relNode)); + Assert.assertTrue(s.contains(SELECT_ALL_FROM_ORDERS_WHERE_QUANTITY_GREATER_THAN_FIVE_PLAN_EXPECTED)); + } + + public static Map<CalciteConnectionProperty, String> defaultConfiguration(){ + Map<CalciteConnectionProperty, String> map = new HashMap<CalciteConnectionProperty, String>(); + + map.put(CalciteConnectionProperty.CASE_SENSITIVE, "false"); + map.put(CalciteConnectionProperty.QUOTED_CASING, Casing.UNCHANGED.name()); + map.put(CalciteConnectionProperty.UNQUOTED_CASING, Casing.UNCHANGED.name()); + map.put(CalciteConnectionProperty.QUOTING, Quoting.BACK_TICK.name()); + + return map; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/78d2fedb/samza-sql/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java new file mode 100644 index 0000000..f757d8f --- /dev/null +++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/SamzaStreamTableFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.sql.planner; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.*; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.Map; + +public class SamzaStreamTableFactory implements TableFactory<Table> { + public Table create(SchemaPlus schema, String name, + Map<String, Object> operand, RelDataType rowType) { + final RelProtoDataType protoRowType = new RelProtoDataType() { + public RelDataType apply(RelDataTypeFactory a0) { + return a0.builder() + .add("id", SqlTypeName.INTEGER) + .add("product", SqlTypeName.VARCHAR, 10) + .add("quantity", SqlTypeName.INTEGER) + .build(); + } + }; + final ImmutableList<Object[]> rows = ImmutableList.of( + new Object[]{1, "paint", 10}, + new Object[]{2, "paper", 5}); + + return new StreamableTable() { + public Table stream() { + return new OrdersTable(protoRowType, rows); + } + + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + }; + } + + public static class OrdersTable implements ScannableTable { + private final RelProtoDataType protoRowType; + private final ImmutableList<Object[]> rows; + + public OrdersTable(RelProtoDataType protoRowType, + ImmutableList<Object[]> rows) { + this.protoRowType = protoRowType; + this.rows = rows; + } + + public Enumerable<Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + public Schema.TableType getJdbcTableType() { + return Schema.TableType.STREAM; + } + } +}