Repository: storm Updated Branches: refs/heads/master 69568ecf0 -> 4c5e34ee6
STORM-2330: Fix storm sql code generation for UDAF with non standard sql types Storm-sql uses cacite's code generator. For UDAF with non standard return types, the generated code does not compile. E.g. For an UDAF that returns a List<T> cacite converts the return type to SQL type "OTHER" and finally inserts a cast to Object[] in the returned code which fails to compile with "java.lang.ClassCastException: java.util.ArrayList cannot be cast to [Ljava.lang.Object;" Use a custom type factory to return "ANY" as the type (instead of OTHER) if the java type cannot be mapped to a sql type. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1d09d786 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1d09d786 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1d09d786 Branch: refs/heads/master Commit: 1d09d786fdbf61f634441e76e2c2a53c35178540 Parents: 1811273 Author: Arun Mahadevan <ar...@apache.org> Authored: Fri Jan 27 12:19:18 2017 +0530 Committer: Arun Mahadevan <ar...@apache.org> Committed: Fri Jan 27 12:19:18 2017 +0530 ---------------------------------------------------------------------- .../jvm/org/apache/storm/sql/StormSqlImpl.java | 24 ++++----- .../RexNodeToBlockStatementCompiler.java | 2 +- .../sql/compiler/StormSqlTypeFactoryImpl.java | 51 ++++++++++++++++++++ .../test/org/apache/storm/sql/TestStormSql.java | 18 +++++++ 4 files changed, 82 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java index af0f1c1..f7bd719 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java @@ -17,36 +17,36 @@ */ package org.apache.storm.sql; +import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AggregateFunctionImpl; import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.SubmitOptions; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Table; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Planner; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl; import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler; import org.apache.storm.sql.parser.ColumnConstraint; import org.apache.storm.sql.parser.ColumnDefinition; import org.apache.storm.sql.parser.SqlCreateFunction; import org.apache.storm.sql.parser.SqlCreateTable; import org.apache.storm.sql.parser.StormParser; -import org.apache.storm.sql.planner.trident.QueryPlanner; import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.trident.QueryPlanner; import org.apache.storm.sql.runtime.AbstractValuesProcessor; import org.apache.storm.sql.runtime.ChannelHandler; import org.apache.storm.sql.runtime.DataSource; @@ -74,7 +74,7 @@ import java.util.jar.Manifest; import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo; class StormSqlImpl extends StormSql { - private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl( + private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl( RelDataTypeSystem.DEFAULT); private final SchemaPlus schema = Frameworks.createRootSchema(true); private boolean hasUdf = false; http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java index 8400a9b..06ed227 100644 --- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java @@ -72,7 +72,7 @@ public class RexNodeToBlockStatementCompiler { final ParameterExpression outputValues_ = Expressions.parameter(Object[].class, "outputValues"); final JavaTypeFactoryImpl javaTypeFactory = - new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); + new StormSqlTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); final RexToLixTranslator.InputGetter inputGetter = new RexToLixTranslator.InputGetterImpl( http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java new file mode 100644 index 0000000..21ca063 --- /dev/null +++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.compiler; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules; +import org.apache.calcite.sql.type.SqlTypeName; + +public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl { + + public StormSqlTypeFactoryImpl() { + } + + public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) { + super(typeSystem); + } + + @Override + public RelDataType toSql(RelDataType type) { + if (type instanceof JavaType) { + JavaType javaType = (JavaType) type; + SqlTypeName sqlTypeName = JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass()); + if (sqlTypeName == null) { + sqlTypeName = SqlTypeName.ANY; + } + return createTypeWithNullability(createSqlType(sqlTypeName), type.isNullable()); + } + return super.toSql(type); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java index 1470344..82dc184 100644 --- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java +++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -359,6 +360,23 @@ public class TestStormSql { } @Test + public void testAggFnNonSqlReturnType() throws Exception { + List<String> stmt = new ArrayList<>(); + stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'"); + stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'"); + stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID >= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0"); + StormSql sql = StormSql.construct(); + List<Values> values = new ArrayList<>(); + ChannelHandler h = new TestUtils.CollectDataChannelHandler(values); + sql.execute(stmt, h); + Assert.assertEquals(4, values.size()); + Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2)); + Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2)); + Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2)); + Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2)); + } + + @Test public void testGroupbySameAggregateOnDifferentColumns() throws Exception { List<String> stmt = new ArrayList<>(); stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");