Repository: storm
Updated Branches:
  refs/heads/master 69568ecf0 -> 4c5e34ee6


STORM-2330: Fix storm sql code generation for UDAF with non standard sql types

Storm-sql uses cacite's code generator. For UDAF with non standard return 
types, the generated code does not compile.

E.g. For an UDAF that returns a List<T> cacite converts the return type to SQL 
type "OTHER" and finally inserts a cast to Object[] in the returned code which 
fails to compile with "java.lang.ClassCastException: java.util.ArrayList cannot 
be cast to [Ljava.lang.Object;"

Use a custom type factory to return "ANY" as the type (instead of OTHER) if the 
java type cannot be mapped to a sql type.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1d09d786
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1d09d786
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1d09d786

Branch: refs/heads/master
Commit: 1d09d786fdbf61f634441e76e2c2a53c35178540
Parents: 1811273
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri Jan 27 12:19:18 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Fri Jan 27 12:19:18 2017 +0530

----------------------------------------------------------------------
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  | 24 ++++-----
 .../RexNodeToBlockStatementCompiler.java        |  2 +-
 .../sql/compiler/StormSqlTypeFactoryImpl.java   | 51 ++++++++++++++++++++
 .../test/org/apache/storm/sql/TestStormSql.java | 18 +++++++
 4 files changed, 82 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index af0f1c1..f7bd719 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -17,36 +17,36 @@
  */
 package org.apache.storm.sql;
 
+import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AggregateFunctionImpl;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.SubmitOptions;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.sql.compiler.StormSqlTypeFactoryImpl;
 import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.parser.ColumnConstraint;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateFunction;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
-import org.apache.storm.sql.planner.trident.QueryPlanner;
 import org.apache.storm.sql.planner.StormRelUtils;
+import org.apache.storm.sql.planner.trident.QueryPlanner;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
@@ -74,7 +74,7 @@ import java.util.jar.Manifest;
 import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
 
 class StormSqlImpl extends StormSql {
-  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+  private final JavaTypeFactory typeFactory = new StormSqlTypeFactoryImpl(
       RelDataTypeSystem.DEFAULT);
   private final SchemaPlus schema = Frameworks.createRootSchema(true);
   private boolean hasUdf = false;

http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
index 8400a9b..06ed227 100644
--- 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RexNodeToBlockStatementCompiler.java
@@ -72,7 +72,7 @@ public class RexNodeToBlockStatementCompiler {
     final ParameterExpression outputValues_ =
         Expressions.parameter(Object[].class, "outputValues");
     final JavaTypeFactoryImpl javaTypeFactory =
-        new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
+        new 
StormSqlTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
 
     final RexToLixTranslator.InputGetter inputGetter =
             new RexToLixTranslator.InputGetterImpl(

http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
new file mode 100644
index 0000000..21ca063
--- /dev/null
+++ 
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/StormSqlTypeFactoryImpl.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class StormSqlTypeFactoryImpl extends JavaTypeFactoryImpl {
+
+    public StormSqlTypeFactoryImpl() {
+    }
+
+    public StormSqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+        super(typeSystem);
+    }
+
+    @Override
+    public RelDataType toSql(RelDataType type) {
+        if (type instanceof JavaType) {
+            JavaType javaType = (JavaType) type;
+            SqlTypeName sqlTypeName = 
JavaToSqlTypeConversionRules.instance().lookup(javaType.getJavaClass());
+            if (sqlTypeName == null) {
+                sqlTypeName = SqlTypeName.ANY;
+            }
+            return createTypeWithNullability(createSqlType(sqlTypeName), 
type.isNullable());
+        }
+        return super.toSql(type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d09d786/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
----------------------------------------------------------------------
diff --git 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 1470344..82dc184 100644
--- 
a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ 
b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -359,6 +360,23 @@ public class TestStormSql {
   }
 
   @Test
+  public void testAggFnNonSqlReturnType() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");
+    stmt.add("CREATE FUNCTION TOPN AS 'org.apache.storm.sql.TestUtils$TopN'");
+    stmt.add("SELECT STREAM ID, SUM(SALARY), TOPN(1, SALARY) FROM FOO WHERE ID 
>= 0 GROUP BY (ID) HAVING MAX(SALARY) > 0");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(4, values.size());
+    Assert.assertEquals(Collections.singletonList(2), values.get(0).get(2));
+    Assert.assertEquals(Collections.singletonList(5), values.get(1).get(2));
+    Assert.assertEquals(Collections.singletonList(8), values.get(2).get(2));
+    Assert.assertEquals(Collections.singletonList(9), values.get(3).get(2));
+  }
+
+  @Test
   public void testGroupbySameAggregateOnDifferentColumns() throws Exception {
     List<String> stmt = new ArrayList<>();
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY, SALARY INT, PCT 
DOUBLE, NAME VARCHAR) LOCATION 'mockgroup:///foo'");

Reply via email to