This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 78bd8c8  [FLINK-18750][table] SqlValidatorException thrown when select 
from a view which contains a UDTF call
78bd8c8 is described below

commit 78bd8c8c57be3dee2dff765cea6175a3c8dfc59c
Author: yuzhao.cyz <yuzhao....@gmail.com>
AuthorDate: Tue Aug 25 10:12:23 2020 +0800

    [FLINK-18750][table] SqlValidatorException thrown when select from a view 
which contains a UDTF call
---
 .../operations/SqlToOperationConverter.java        |  63 +++++---
 .../apache/flink/table/planner/utils/Expander.java | 167 +++++++++++++++++++++
 .../plan/rules/logical/WindowPropertiesRule.scala  |   2 +-
 .../planner/plan/common/ViewsExpandingTest.xml     |  44 ++++++
 .../planner/plan/common/ViewsExpandingTest.scala   |  72 ++++++++-
 5 files changed, 321 insertions(+), 27 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index fa04539..4563ef0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -110,6 +110,7 @@ import 
org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
@@ -132,6 +133,7 @@ import org.apache.calcite.sql.parser.SqlParser;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -286,15 +288,12 @@ public class SqlToOperationConverter {
                        SqlAlterViewAs alterViewAs = (SqlAlterViewAs) alterView;
                        final SqlNode newQuery = alterViewAs.getNewQuery();
 
-                       SqlNode validateQuery = flinkPlanner.validate(newQuery);
-                       PlannerQueryOperation operation = 
toQueryOperation(flinkPlanner, validateQuery);
-                       TableSchema schema = operation.getTableSchema();
-
-                       String originalQuery = getQuotedSqlString(newQuery);
-                       String expandedQuery = 
getQuotedSqlString(validateQuery);
                        CatalogView oldView = (CatalogView) baseTable;
-                       CatalogView newView = new 
CatalogViewImpl(originalQuery, expandedQuery, schema,
-                                       oldView.getOptions(), 
oldView.getComment());
+                       CatalogView newView = convertViewQuery(
+                                       newQuery,
+                                       Collections.emptyList(),
+                                       oldView.getOptions(),
+                                       oldView.getComment());
                        return new AlterViewAsOperation(viewIdentifier, 
newView);
                } else {
                        throw new ValidationException(
@@ -658,7 +657,27 @@ public class SqlToOperationConverter {
                final SqlNode query = sqlCreateView.getQuery();
                final SqlNodeList fieldList = sqlCreateView.getFieldList();
 
-               SqlNode validateQuery = flinkPlanner.validate(query);
+               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlCreateView.fullViewName());
+               ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+               String comment = sqlCreateView.getComment()
+                               .map(c -> 
c.getNlsString().getValue()).orElse(null);
+               CatalogView catalogView = convertViewQuery(
+                               query,
+                               fieldList.getList(),
+                               
OperationConverterUtils.extractProperties(sqlCreateView.getProperties()
+                                               .orElse(null)),
+                               comment);
+               return new CreateViewOperation(
+                               identifier,
+                               catalogView,
+                               sqlCreateView.isIfNotExists(),
+                               sqlCreateView.isTemporary());
+       }
+
+       /** Convert the query part of a VIEW statement. */
+       private CatalogView convertViewQuery(SqlNode query, List<SqlNode> 
fieldNames,
+                       Map<String, String> props, String comment) {
                // Put the sql string unparse (getQuotedSqlString()) in front of
                // the node conversion (toQueryOperation()),
                // because before Calcite 1.22.0, during sql-to-rel conversion, 
the SqlWindow
@@ -666,17 +685,23 @@ public class SqlToOperationConverter {
 
                // This bug is fixed in CALCITE-3877 of Calcite 1.23.0.
                String originalQuery = getQuotedSqlString(query);
-               String expandedQuery = getQuotedSqlString(validateQuery);
+               SqlNode validateQuery = flinkPlanner.validate(query);
+               // The LATERAL operator was eliminated during sql validation, 
thus the unparsed SQL
+               // does not contain LATERAL which is problematic,
+               // the issue was resolved in CALCITE-4077
+               // (always treat the table function as implicitly LATERAL).
+               String expandedQuery = Expander.create(flinkPlanner)
+                               
.expanded(originalQuery).substitute(this::getQuotedSqlString);
 
                PlannerQueryOperation operation = 
toQueryOperation(flinkPlanner, validateQuery);
                TableSchema schema = operation.getTableSchema();
 
                // the view column list in CREATE VIEW is optional, if it's not 
empty, we should update
                // the column name with the names in view column list.
-               if (!fieldList.getList().isEmpty()) {
+               if (!fieldNames.isEmpty()) {
                        // alias column names:
                        String[] inputFieldNames = schema.getFieldNames();
-                       String[] aliasFieldNames = fieldList.getList().stream()
+                       String[] aliasFieldNames = fieldNames.stream()
                                        .map(SqlNode::toString)
                                        .toArray(String[]::new);
 
@@ -690,21 +715,11 @@ public class SqlToOperationConverter {
                        schema = TableSchema.builder().fields(aliasFieldNames, 
inputFieldTypes).build();
                }
 
-               String comment = sqlCreateView.getComment().map(c -> 
c.getNlsString().getValue()).orElse(null);
-               CatalogView catalogView = new CatalogViewImpl(originalQuery,
+               return new CatalogViewImpl(originalQuery,
                                expandedQuery,
                                schema,
-                               
OperationConverterUtils.extractProperties(sqlCreateView.getProperties().orElse(null)),
+                               props,
                                comment);
-
-               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlCreateView.fullViewName());
-               ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-               return new CreateViewOperation(
-                               identifier,
-                               catalogView,
-                               sqlCreateView.isIfNotExists(),
-                               sqlCreateView.isTemporary());
        }
 
        /** Convert DROP VIEW statement. */
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java
new file mode 100644
index 0000000..8a3e403
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlUnresolvedFunction;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlShuttle;
+import org.apache.calcite.util.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Utility that expand SQL identifiers in a SQL query.
+ *
+ * <p>Simple use:
+ *
+ * <blockquote><code>
+ * final String sql =<br>
+ *     "select ename from emp where deptno &lt; 10";<br>
+ * final Expander.Expanded expanded =<br>
+ *     Expander.create(planner).expanded(sql);<br>
+ * print(expanded); // "select `emp`.`ename` from `catalog`.`db`.`emp` where 
`emp`.`deptno` &lt; 10"
+ * </code></blockquote>
+ *
+ * <p>Calling {@link Expanded#toString()} generates a string that is similar to
+ * SQL where a user has manually converted all identifiers as expanded, and
+ * which could then be persisted as expanded query of a Catalog view.
+ *
+ * <p>For more advanced formatting, use {@link Expanded#substitute(Function)}.
+ *
+ * <p>Adjust {@link SqlParser.Config} to use a different parser or parsing 
options.
+ */
+public class Expander {
+       private final FlinkPlannerImpl planner;
+
+       private Expander(FlinkPlannerImpl planner) {
+               this.planner = Objects.requireNonNull(planner);
+       }
+
+       /** Creates an Expander. **/
+       public static Expander create(FlinkPlannerImpl planner) {
+               return new Expander(planner);
+       }
+
+       /** Expands identifiers in a given SQL string, returning a {@link 
Expanded}. */
+       public Expanded expanded(String ori) {
+               final Map<SqlParserPos, SqlIdentifier> identifiers = new 
HashMap<>();
+               final Map<String, SqlIdentifier> funcNameToId = new HashMap<>();
+               final SqlNode oriNode = planner.parser().parse(ori);
+               // parse again because validation is stateful, that means the 
node tree was probably
+               // mutated.
+               final SqlNode validated = 
planner.validate(planner.parser().parse(ori));
+               validated.accept(new SqlBasicVisitor<Void>() {
+                       @Override public Void visit(SqlCall call) {
+                               SqlOperator operator = call.getOperator();
+                               if (operator instanceof BridgingSqlFunction) {
+                                       final SqlIdentifier functionID = 
((BridgingSqlFunction) operator)
+                                                       .getSqlIdentifier();
+                                       if (!functionID.isSimple()) {
+                                               
funcNameToId.put(Util.last(functionID.names), functionID);
+                                       }
+                               }
+                               return super.visit(call);
+                       }
+
+                       @Override public Void visit(SqlIdentifier identifier) {
+                               
identifiers.putIfAbsent(identifier.getParserPosition(), identifier);
+                               return null;
+                       }
+               });
+               return new Expanded(oriNode, identifiers, funcNameToId);
+       }
+
+       /** Result of expanding. */
+       public static class Expanded {
+               private final SqlNode oriNode;
+               private final Map<SqlParserPos, SqlIdentifier> identifiersMap;
+               private final Map<String, SqlIdentifier> funcNameToId;
+
+               Expanded(SqlNode oriNode, Map<SqlParserPos, SqlIdentifier> 
identifiers,
+                               Map<String, SqlIdentifier> funcNameToId) {
+                       this.oriNode = oriNode;
+                       this.identifiersMap = ImmutableMap.copyOf(identifiers);
+                       this.funcNameToId = ImmutableMap.copyOf(funcNameToId);
+               }
+
+               @Override
+               public String toString() {
+                       return substitute(SqlNode::toString);
+               }
+
+               /** Returns the SQL string with identifiers replaced according 
to the
+                * given unparse function. */
+               public String substitute(Function<SqlNode, String> fn) {
+                       final SqlShuttle shuttle = new SqlShuttle() {
+                               @Override
+                               public SqlNode visit(SqlCall call) {
+                                       SqlOperator operator = 
call.getOperator();
+                                       if (operator instanceof 
SqlUnresolvedFunction) {
+                                               final SqlUnresolvedFunction 
unresolvedFunction =
+                                                               
(SqlUnresolvedFunction) operator;
+                                               final SqlIdentifier functionID 
= unresolvedFunction.getSqlIdentifier();
+                                               if (functionID.isSimple()
+                                                               && 
funcNameToId.containsKey(functionID.getSimple())) {
+                                                       SqlUnresolvedFunction 
newFunc = new SqlUnresolvedFunction(
+                                                                       
funcNameToId.get(functionID.getSimple()),
+                                                                       
unresolvedFunction.getReturnTypeInference(),
+                                                                       
unresolvedFunction.getOperandTypeInference(),
+                                                                       
unresolvedFunction.getOperandTypeChecker(),
+                                                                       
unresolvedFunction.getParamTypes(),
+                                                                       
unresolvedFunction.getFunctionType());
+                                                       return 
newFunc.createCall(
+                                                                       
call.getFunctionQuantifier(),
+                                                                       
call.getParserPosition(),
+                                                                       
call.getOperandList().toArray(new SqlNode[0]));
+                                               }
+                                       }
+                                       return super.visit(call);
+                               }
+
+                               @Override
+                               public SqlNode visit(SqlIdentifier id) {
+                                       if (id.isStar()) {
+                                               return id;
+                                       }
+                                       final SqlIdentifier toReplace = 
identifiersMap.get(id.getParserPosition());
+                                       if (toReplace == null || 
id.names.size() >= toReplace.names.size()) {
+                                               return id;
+                                       }
+                                       return toReplace;
+                               }
+                       };
+                       final SqlNode substituted = 
this.oriNode.accept(shuttle);
+                       return  fn.apply(substituted);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
index 2d0b5c0..c0c92df8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
@@ -45,7 +45,7 @@ class WindowPropertiesRule extends RelOptRule(
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val project: LogicalProject = call.rel(0)
-    // project includes at least on group auxiliary function
+    // project includes at least one group auxiliary function
     project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
index cbe66fd..ce101a7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml
@@ -161,4 +161,48 @@ Calc(select=[CAST(a) AS a, b, CAST(EXPR$2) AS c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testViewExpandingWithLateralTableFunction[0]">
+    <Resource name="sql">
+      <![CDATA[select * from tmp_view]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(f0=[$0], f1=[$1])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])
+   :  +- LogicalValues(tuples=[[{ _UTF-16LE'danny#21' }, { 
_UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]])
+   +- LogicalTableFunctionScan(invocation=[myFunc($cor1.f0)], 
rowType=[*org.apache.flink.table.planner.utils.SimpleUser*])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[f0, name AS f1])
++- Correlate(invocation=[myFunc($cor1.f0)], 
correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], 
rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], 
joinType=[INNER])
+   +- Calc(select=[f0])
+      +- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { 
_UTF-16LE'fabian#30' }]], values=[f0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testViewExpandingWithLateralTableFunction[1]">
+    <Resource name="sql">
+      <![CDATA[select * from tmp_view]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(f0=[$0], f1=[$1])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])
+   :  +- LogicalValues(tuples=[[{ _UTF-16LE'danny#21' }, { 
_UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]])
+   +- LogicalTableFunctionScan(invocation=[myFunc($cor1.f0)], 
rowType=[*org.apache.flink.table.planner.utils.SimpleUser*])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[f0, name AS f1])
++- Correlate(invocation=[myFunc($cor1.f0)], 
correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], 
rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], 
joinType=[INNER])
+   +- Calc(select=[f0])
+      +- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { 
_UTF-16LE'fabian#30' }]])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
index 5bdb6b5..666de8d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
@@ -20,9 +20,13 @@ package org.apache.flink.table.planner.plan.common
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.catalog.{CatalogView, CatalogViewImpl, 
ObjectPath}
-import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil, 
TableTestUtilBase}
+import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogView, 
CatalogViewImpl, ObjectIdentifier, ObjectPath}
+import org.apache.flink.table.functions.ScalarFunction
+import 
org.apache.flink.table.planner.plan.common.ViewsExpandingTest.PrimitiveScalarFunction
+import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase, 
TableTestUtil, TableTestUtilBase}
 
+import org.hamcrest.CoreMatchers.is
+import org.junit.Assert.assertThat
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -114,6 +118,60 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => 
TableTestUtil) extends
     tableUtil.verifyPlan("select * from view1")
   }
 
+  @Test
+  def testViewExpandingWithLateralTableFunction(): Unit = {
+    val tableUtil = tableTestUtil(this)
+    val tableEnv = tableUtil.tableEnv
+    tableEnv.createTemporarySystemFunction("myFunc", new TableFunc0())
+    tableEnv.createTemporaryView("source",
+      tableEnv.fromValues("danny#21", "julian#55", "fabian#30").as("f0"))
+    val createView =
+      """
+        |CREATE VIEW tmp_view AS
+        |  SELECT f0, f1
+        |  FROM source as S, LATERAL TABLE(myFunc(f0)) as T(f1, f2)
+        |""".stripMargin
+    tableEnv.executeSql(createView)
+    tableUtil.verifyPlan("select * from tmp_view")
+  }
+
+  @Test
+  def testViewExpandingWithBuiltinFunction(): Unit = {
+    val tableUtil = tableTestUtil(this)
+    val tableEnv = tableUtil.tableEnv
+    val createView =
+      """
+        |CREATE VIEW tmp_view AS
+        |  SELECT CONCAT('a', 'bc', 'def')
+        |""".stripMargin
+    tableEnv.executeSql(createView)
+    val objectID = ObjectIdentifier.of(tableEnv.getCurrentCatalog,
+      tableEnv.getCurrentDatabase, "tmp_view")
+    val view: CatalogBaseTable = tableEnv.getCatalog(objectID.getCatalogName)
+      .get().getTable(objectID.toObjectPath)
+    assertThat(view.asInstanceOf[CatalogView].getExpandedQuery,
+      is("SELECT `CONCAT`('a', 'bc', 'def')"))
+  }
+
+  @Test
+  def testViewExpandingWithUDF(): Unit = {
+    val tableUtil = tableTestUtil(this)
+    val tableEnv = tableUtil.tableEnv
+    tableEnv.createTemporaryFunction("func", classOf[PrimitiveScalarFunction])
+    val createView =
+      """
+        |CREATE VIEW tmp_view AS
+        |  SELECT func(1, 2, 'abc')
+        |""".stripMargin
+    tableEnv.executeSql(createView)
+    val objectID = ObjectIdentifier.of(tableEnv.getCurrentCatalog,
+      tableEnv.getCurrentDatabase, "tmp_view")
+    val view: CatalogBaseTable = tableEnv.getCatalog(objectID.getCatalogName)
+      .get().getTable(objectID.toObjectPath)
+    assertThat(view.asInstanceOf[CatalogView].getExpandedQuery,
+      is("SELECT `default_catalog`.`default_database`.`func`(1, 2, 'abc')"))
+  }
+
   private def createSqlView(originTable: String): CatalogView = {
       new CatalogViewImpl(
         s"select * as c from $originTable",
@@ -137,4 +195,14 @@ object ViewsExpandingTest {
       _.batchTestUtil(),
       _.streamTestUtil())
   }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Test functions
+  // 
--------------------------------------------------------------------------------------------
+  /**
+   * Function that takes and returns primitives.
+   */
+  class PrimitiveScalarFunction extends ScalarFunction {
+    def eval(i: Int, l: Long, s: String): Long = i + l + s.length
+  }
 }

Reply via email to