This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new ca1cacd  [FLINK-15599][table] SQL client requires both legacy and 
blink planner to be on the classpath
ca1cacd is described below

commit ca1cacd3494232bb7f34327c829033160a86abcd
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Fri Jan 17 14:16:31 2020 +0800

    [FLINK-15599][table] SQL client requires both legacy and blink planner to 
be on the classpath
    
    This closes #10878
---
 .../client/gateway/local/ExecutionContext.java     |  26 ++-
 .../table/client/gateway/local/LocalExecutor.java  |   4 +-
 .../flink/table/functions/FunctionService.java     | 186 +++++++++++++++++++++
 .../flink/table/functions/FunctionServiceTest.java | 172 +++++++++++++++++++
 .../planner/delegation/BlinkPlannerFactory.java    |   1 +
 .../flink/table/planner/StreamPlannerFactory.java  |   1 +
 .../flink/table/functions/FunctionService.scala    | 160 ------------------
 .../table/functions/FunctionServiceTest.scala      | 125 --------------
 8 files changed, 372 insertions(+), 303 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 8a1f082..174c014 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -265,20 +265,27 @@ public class ExecutionContext<ClusterID> {
                }
        }
 
-       public Pipeline createPipeline(String name, Configuration flinkConfig) {
+       public Pipeline createPipeline(String name) {
                if (streamExecEnv != null) {
                        // special case for Blink planner to apply batch 
optimizations
                        // note: it also modifies the ExecutionConfig!
-                       if (executor instanceof ExecutorBase) {
+                       if (isBlinkPlanner(executor.getClass())) {
                                return ((ExecutorBase) 
executor).getStreamGraph(name);
                        }
                        return streamExecEnv.getStreamGraph(name);
                } else {
-                       final int parallelism = execEnv.getParallelism();
                        return execEnv.createProgramPlan(name);
                }
        }
 
+       private boolean isBlinkPlanner(Class<? extends Executor> executorClass) 
{
+               try {
+                       return 
ExecutorBase.class.isAssignableFrom(executorClass);
+               } catch (NoClassDefFoundError ignore) {
+                       // blink planner might not be on the class path
+                       return false;
+               }
+       }
 
        /** Returns a builder for this {@link ExecutionContext}. */
        public static Builder builder(
@@ -682,19 +689,6 @@ public class ExecutionContext<ClusterID> {
                }
        }
 
-       private Pipeline createPipeline(String name) {
-               if (streamExecEnv != null) {
-                       // special case for Blink planner to apply batch 
optimizations
-                       // note: it also modifies the ExecutionConfig!
-                       if (executor instanceof ExecutorBase) {
-                               return ((ExecutorBase) 
executor).getStreamGraph(name);
-                       }
-                       return streamExecEnv.getStreamGraph(name);
-               } else {
-                       return execEnv.createProgramPlan(name);
-               }
-       }
-
        //~ Inner Class 
-------------------------------------------------------------------------------
 
        /** Builder for {@link ExecutionContext}. */
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 513e215..d0846e2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -584,7 +584,7 @@ public class LocalExecutor implements Executor {
                final String jobName = sessionId + ": " + statement;
                final Pipeline pipeline;
                try {
-                       pipeline = context.createPipeline(jobName, 
context.getFlinkConfig());
+                       pipeline = context.createPipeline(jobName);
                } catch (Throwable t) {
                        // catch everything such that the statement does not 
crash the executor
                        throw new SqlExecutionException("Invalid SQL 
statement.", t);
@@ -628,7 +628,7 @@ public class LocalExecutor implements Executor {
                                                context.getQueryConfig(),
                                                tableName);
                        });
-                       pipeline = context.createPipeline(jobName, 
context.getFlinkConfig());
+                       pipeline = context.createPipeline(jobName);
                } catch (Throwable t) {
                        // the result needs to be closed as long as
                        // it not stored in the result store
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionService.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionService.java
new file mode 100644
index 0000000..fb4a7d4
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionService.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.ClassInstanceValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionDescriptorValidator;
+import org.apache.flink.table.descriptors.HierarchyDescriptorValidator;
+import org.apache.flink.table.descriptors.LiteralValueValidator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Service for creating configured instances of {@link UserDefinedFunction} 
using a
+ * {@link FunctionDescriptor}.
+ */
+public class FunctionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FunctionService.class);
+
+       /**
+        * Creates a user-defined function with the given properties and the 
current thread's
+        * context class loader.
+        *
+        * @param descriptor the descriptor that describes a function
+        * @return the generated user-defined function
+        */
+       public static UserDefinedFunction createFunction(FunctionDescriptor 
descriptor) {
+               return createFunction(descriptor, 
Thread.currentThread().getContextClassLoader());
+       }
+
+       /**
+        * Creates a user-defined function with the given properties.
+        *
+        * @param descriptor the descriptor that describes a function
+        * @param classLoader the class loader to load the function and its 
parameter's classes
+        * @return the generated user-defined function
+        */
+       public static UserDefinedFunction createFunction(
+                       FunctionDescriptor descriptor,
+                       ClassLoader classLoader) {
+               return createFunction(descriptor, classLoader, true);
+       }
+
+       /**
+        * Creates a user-defined function with the given properties.
+        *
+        * @param descriptor the descriptor that describes a function
+        * @param classLoader the class loader to load the function and its 
parameter's classes
+        * @param performValidation whether or not the descriptor should be 
validated
+        * @return the generated user-defined function
+        */
+       public static UserDefinedFunction createFunction(
+                       FunctionDescriptor descriptor,
+                       ClassLoader classLoader,
+                       boolean performValidation) {
+
+               DescriptorProperties properties = new 
DescriptorProperties(true);
+               properties.putProperties(descriptor.toProperties());
+
+               // validate
+               if (performValidation) {
+                       new FunctionDescriptorValidator().validate(properties);
+               }
+
+               // instantiate
+               Object instance = generateInstance(
+                               HierarchyDescriptorValidator.EMPTY_PREFIX,
+                               properties,
+                               classLoader);
+
+               if 
(!UserDefinedFunction.class.isAssignableFrom(instance.getClass())) {
+                       throw new ValidationException(String.format(
+                                       "Instantiated class '%s' is not a 
user-defined function.",
+                                       instance.getClass().getName()));
+               }
+               return (UserDefinedFunction) instance;
+       }
+
+       /**
+        * Recursively generate an instance of a class according the given 
properties.
+        *
+        * @param keyPrefix the prefix to fetch properties
+        * @param descriptorProperties the descriptor properties that contains 
the class type information
+        * @param classLoader the class loader to load the class
+        * @param <T> type fo the generated instance
+        * @return an instance of the class
+        */
+       private static <T> T generateInstance(
+                       String keyPrefix,
+                       DescriptorProperties descriptorProperties,
+                       ClassLoader classLoader) {
+               String instanceClassName = descriptorProperties.getString(
+                               keyPrefix + ClassInstanceValidator.CLASS);
+
+               Class<T> instanceClass;
+               try {
+                       //noinspection unchecked
+                       instanceClass = (Class<T>) 
Class.forName(instanceClassName, true, classLoader);
+               } catch (Exception e) {
+                       // only log the cause to have clean error messages
+                       String msg = String.format(
+                                       "Could not find class '%s' for creating 
an instance.", instanceClassName);
+                       LOG.error(msg, e);
+                       throw new ValidationException(msg);
+               }
+
+               String constructorPrefix = keyPrefix + 
ClassInstanceValidator.CONSTRUCTOR;
+
+               List<Map<String, String>> constructorProps = 
descriptorProperties
+                               
.getVariableIndexedProperties(constructorPrefix, new ArrayList<>());
+
+               ArrayList<Object> parameterList = new ArrayList<>();
+               for (int i = 0; i < constructorProps.size(); i++) {
+                       String constructorKey = constructorPrefix + "." + i + 
".";
+                       // nested class instance
+                       if 
(constructorProps.get(i).containsKey(ClassInstanceValidator.CLASS)) {
+                               parameterList.add(generateInstance(
+                                               constructorKey,
+                                               descriptorProperties,
+                                               classLoader));
+                       }
+                       // literal value
+                       else {
+                               Object literalValue = 
LiteralValueValidator.getValue(
+                                               constructorKey, 
descriptorProperties);
+                               parameterList.add(literalValue);
+                       }
+               }
+
+               String parameterNames = parameterList.stream()
+                               .map(t -> t.getClass().getName())
+                               .reduce((s1, s2) -> s1 + ", " + s2)
+                               .orElse("");
+
+               Constructor<T> constructor;
+               try {
+                       constructor = instanceClass.getConstructor(
+                                       
parameterList.stream().map(Object::getClass).toArray(Class[]::new));
+               } catch (Exception e) {
+                       // only log the cause to have clean error messages
+                       String msg = String.format(
+                                       "Cannot find a public constructor with 
parameter types '%s' for '%s'.",
+                                       parameterNames,
+                                       instanceClassName);
+                       LOG.error(msg, e);
+                       throw new ValidationException(msg);
+               }
+
+               try {
+                       return constructor.newInstance(parameterList.toArray());
+               } catch (Exception e) {
+                       // only log the cause to have clean error messages
+                       String msg = String.format(
+                                       "Error while creating instance of class 
'%s' with parameter types '%s'.",
+                                       instanceClassName,
+                                       parameterNames);
+                       LOG.error(msg, e);
+                       throw new ValidationException(msg);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/FunctionServiceTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/FunctionServiceTest.java
new file mode 100644
index 0000000..ea4030b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/FunctionServiceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.ClassInstance;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for {@link FunctionService}.
+ */
+public class FunctionServiceTest {
+
+       @Test(expected = ValidationException.class)
+       public void testWrongArgsFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new ClassInstance()
+                                               .of(NoArgClass.class.getName())
+                                               .parameterString("12"));
+
+               FunctionService.createFunction(descriptor);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testPrivateFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new 
ClassInstance().of(PrivateClass.class.getName()));
+
+               FunctionService.createFunction(descriptor);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidClassFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new 
ClassInstance().of("this.class.does.not.exist"));
+
+               FunctionService.createFunction(descriptor);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testNotFunctionClassFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new ClassInstance()
+                                               .of(String.class.getName())
+                                               .parameterString("hello"));
+
+               FunctionService.createFunction(descriptor);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testErrorConstructorClass() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new ClassInstance()
+                                               
.of(ErrorConstructorClass.class.getName())
+                                               .parameterString("arg"));
+
+               FunctionService.createFunction(descriptor);
+       }
+
+       @Test
+       public void testNoArgFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(new 
ClassInstance().of(NoArgClass.class.getName()));
+
+               assertEquals(NoArgClass.class, 
FunctionService.createFunction(descriptor).getClass());
+       }
+
+       @Test
+       public void testOneArgFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(
+                                               new ClassInstance()
+                                                               
.of(OneArgClass.class.getName())
+                                                               
.parameterString("false"));
+
+               UserDefinedFunction actualFunction = 
FunctionService.createFunction(descriptor);
+
+               assertEquals(OneArgClass.class, actualFunction.getClass());
+               assertFalse(((OneArgClass) actualFunction).field);
+       }
+
+       @Test
+       public void testMultiArgFunctionCreation() {
+               FunctionDescriptor descriptor = new FunctionDescriptor()
+                               .fromClass(
+                                               new ClassInstance()
+                                                               
.of(MultiArgClass.class.getName())
+                                                               .parameter(new 
java.math.BigDecimal("12.0003"))
+                                                               .parameter(new 
ClassInstance()
+                                                                               
.of(BigInteger.class.getName())
+                                                                               
.parameter("111111111111111111111111111111111")));
+
+               UserDefinedFunction actualFunction = 
FunctionService.createFunction(descriptor);
+
+               assertEquals(MultiArgClass.class, actualFunction.getClass());
+               assertEquals(
+                               new java.math.BigDecimal("12.0003"),
+                               ((MultiArgClass) actualFunction).field1);
+               assertEquals(
+                               new 
java.math.BigInteger("111111111111111111111111111111111"),
+                               ((MultiArgClass) actualFunction).field2);
+       }
+
+       /**
+        * Test no argument.
+        */
+       public static class NoArgClass extends ScalarFunction {}
+
+       /**
+        * Test one argument.
+        */
+       public static class OneArgClass extends ScalarFunction {
+               public Boolean field;
+
+               public OneArgClass(Boolean field) {
+                       this.field = field;
+               }
+       }
+
+       /**
+        * Test multi arguments.
+        */
+       public static class MultiArgClass extends ScalarFunction {
+               public final BigDecimal field1;
+               public final BigInteger field2;
+
+               public MultiArgClass(BigDecimal field1, BigInteger field2) {
+                       this.field1 = field1;
+                       this.field2 = field2;
+               }
+       }
+
+       /**
+        * Test private constructor.
+        */
+       public static class PrivateClass extends ScalarFunction {
+               private PrivateClass() {}
+       }
+
+       /**
+        * Test error constructor.
+        */
+       public static class ErrorConstructorClass extends ScalarFunction {
+               public ErrorConstructorClass(String arg) {
+                       throw new RuntimeException(arg);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
index 87652b3..a9aeb14 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BlinkPlannerFactory.java
@@ -53,6 +53,7 @@ public final class BlinkPlannerFactory implements 
PlannerFactory {
                }
        }
 
+       @Override
        public Map<String, String> optionalContext() {
                Map<String, String> map = new HashMap<>();
                map.put(EnvironmentSettings.CLASS_NAME, 
this.getClass().getCanonicalName());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
index 62e19bd..73868c1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -49,6 +49,7 @@ public final class StreamPlannerFactory implements 
PlannerFactory {
                return new StreamPlanner(executor, tableConfig, 
functionCatalog, catalogManager);
        }
 
+       @Override
        public Map<String, String> optionalContext() {
                Map<String, String> map = new HashMap<>();
                map.put(EnvironmentSettings.CLASS_NAME, 
this.getClass().getCanonicalName());
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/FunctionService.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/FunctionService.scala
deleted file mode 100644
index acc1a1c..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/FunctionService.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.util.Logging
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Service for creating configured instances of [[UserDefinedFunction]] using 
a
-  * [[FunctionDescriptor]].
-  */
-object FunctionService extends Logging {
-
-  /**
-    * Creates a user-defined function with the given properties and the 
current thread's
-    * context class loader.
-    *
-    * @param descriptor the descriptor that describes a function
-    * @return the generated user-defined function
-    */
-  def createFunction(descriptor: FunctionDescriptor): UserDefinedFunction = {
-    createFunction(descriptor, Thread.currentThread().getContextClassLoader)
-  }
-
-  /**
-    * Creates a user-defined function with the given properties.
-    *
-    * @param descriptor the descriptor that describes a function
-    * @param classLoader the class loader to load the function and its 
parameter's classes
-    * @param performValidation whether or not the descriptor should be 
validated
-    * @return the generated user-defined function
-    */
-  def createFunction(
-      descriptor: FunctionDescriptor,
-      classLoader: ClassLoader,
-      performValidation: Boolean = true)
-    : UserDefinedFunction = {
-
-    val properties = new DescriptorProperties(true)
-    properties.putProperties(descriptor.toProperties)
-
-    // validate
-    if (performValidation) {
-      new FunctionDescriptorValidator().validate(properties)
-    }
-
-    // instantiate
-    val (instanceClass, instance) = generateInstance[AnyRef](
-      HierarchyDescriptorValidator.EMPTY_PREFIX,
-      properties,
-      classLoader)
-
-    if (!classOf[UserDefinedFunction].isAssignableFrom(instanceClass)) {
-      throw new ValidationException(
-        s"Instantiated class '${instanceClass.getName}' is not a user-defined 
function.")
-    }
-    instance.asInstanceOf[UserDefinedFunction]
-  }
-
-  /**
-   * Recursively generate an instance of a class according the given 
properties.
-   *
-   * @param keyPrefix the prefix to fetch properties
-   * @param descriptorProperties the descriptor properties that contains the 
class type information
-   * @param classLoader the class loader to load the class
-   * @tparam T type fo the generated instance
-   * @return an instance of the class
-   */
-  private def generateInstance[T](
-      keyPrefix: String,
-      descriptorProperties: DescriptorProperties,
-      classLoader: ClassLoader)
-    : (Class[T], T) = {
-
-    val instanceClassName = descriptorProperties.getString(
-      s"$keyPrefix${ClassInstanceValidator.CLASS}")
-
-    val instanceClass = try {
-      Class
-        .forName(
-          
descriptorProperties.getString(s"$keyPrefix${ClassInstanceValidator.CLASS}"),
-          true,
-          classLoader)
-        .asInstanceOf[Class[T]]
-    } catch {
-      case e: Exception =>
-        // only log the cause to have clean error messages
-        val msg = s"Could not find class '$instanceClassName' for creating an 
instance."
-        LOG.error(msg, e)
-        throw new ValidationException(msg)
-    }
-
-    val constructorPrefix = s"$keyPrefix${ClassInstanceValidator.CONSTRUCTOR}"
-
-    val constructorProps = descriptorProperties
-      .getVariableIndexedProperties(constructorPrefix, List())
-
-    var i = 0
-    val parameterList: ArrayBuffer[(Class[_], Any)] = new ArrayBuffer
-    while (i < constructorProps.size()) {
-      // nested class instance
-      if (constructorProps(i).containsKey(ClassInstanceValidator.CLASS)) {
-        parameterList += generateInstance(
-          s"$constructorPrefix.$i.",
-          descriptorProperties,
-          classLoader)
-      }
-      // literal value
-      else {
-        val literalValue = LiteralValueValidator
-          .getValue(s"$constructorPrefix.$i.", descriptorProperties)
-        parameterList += ((literalValue.getClass, literalValue))
-      }
-      i += 1
-    }
-    val constructor = try {
-      instanceClass.getConstructor(parameterList.map(_._1): _*)
-    } catch {
-      case e: Exception =>
-        // only log the cause to have clean error messages
-        val msg = s"Cannot find a public constructor with parameter types " +
-          s"'${parameterList.map(_._1.getName).mkString(", ")}' for 
'$instanceClassName'."
-        LOG.error(msg, e)
-        throw new ValidationException(msg)
-    }
-
-    val instance = try {
-      constructor.newInstance(parameterList.map(_._2.asInstanceOf[AnyRef]): _*)
-    } catch {
-      case e: Exception =>
-        // only log the cause to have clean error messages
-        val msg = s"Error while creating instance of class 
'$instanceClassName' " +
-          s"with parameter types 
'${parameterList.map(_._1.getName).mkString(", ")}'."
-        LOG.error(msg, e)
-        throw new ValidationException(msg)
-    }
-
-    (instanceClass, instance)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
deleted file mode 100644
index 2ffc423..0000000
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/functions/FunctionServiceTest.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.{ClassInstance, FunctionDescriptor}
-import org.apache.flink.table.functions.FunctionServiceTest.{MultiArgClass, 
NoArgClass, OneArgClass, PrivateClass}
-import org.junit.Assert.{assertEquals, assertFalse}
-import org.junit.Test
-
-/**
-  * Tests for [[FunctionService]].
-  */
-class FunctionServiceTest {
-
-  @Test(expected = classOf[ValidationException])
-  def testWrongArgsFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(new ClassInstance()
-        .of(classOf[NoArgClass].getName)
-        .parameterString("12"))
-
-    FunctionService.createFunction(descriptor)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testPrivateFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(new ClassInstance().of(classOf[PrivateClass].getName))
-
-    FunctionService.createFunction(descriptor)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidClassFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(new ClassInstance().of("this.class.does.not.exist"))
-
-    FunctionService.createFunction(descriptor)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNotFunctionClassFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(new ClassInstance()
-        .of(classOf[java.lang.String].getName)
-        .parameterString("hello"))
-
-    FunctionService.createFunction(descriptor)
-  }
-
-  @Test
-  def testNoArgFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(new ClassInstance().of(classOf[NoArgClass].getName))
-
-    assertEquals(classOf[NoArgClass], 
FunctionService.createFunction(descriptor).getClass)
-  }
-
-  @Test
-  def testOneArgFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(
-        new ClassInstance()
-          .of(classOf[OneArgClass].getName)
-          .parameterString("false"))
-
-    val actualFunction = FunctionService.createFunction(descriptor)
-
-    assertEquals(classOf[OneArgClass], actualFunction.getClass)
-    assertFalse(actualFunction.asInstanceOf[OneArgClass].field)
-  }
-
-  @Test
-  def testMultiArgFunctionCreation(): Unit = {
-    val descriptor = new FunctionDescriptor()
-      .fromClass(
-        new ClassInstance()
-          .of(classOf[MultiArgClass].getName)
-          .parameter(new java.math.BigDecimal("12.0003"))
-          .parameter(new ClassInstance()
-            .of(classOf[java.math.BigInteger].getName)
-            .parameter("111111111111111111111111111111111")))
-
-    val actualFunction = FunctionService.createFunction(descriptor)
-
-    assertEquals(classOf[MultiArgClass], actualFunction.getClass)
-    assertEquals(
-      new java.math.BigDecimal("12.0003"),
-      actualFunction.asInstanceOf[MultiArgClass].field1)
-    assertEquals(
-      new java.math.BigInteger("111111111111111111111111111111111"),
-      actualFunction.asInstanceOf[MultiArgClass].field2)
-  }
-}
-
-object FunctionServiceTest {
-
-  class NoArgClass
-    extends ScalarFunction
-
-  class OneArgClass(val field: java.lang.Boolean)
-    extends ScalarFunction
-
-  class MultiArgClass(val field1: java.math.BigDecimal, val field2: 
java.math.BigInteger)
-    extends ScalarFunction
-
-  class PrivateClass private() extends ScalarFunction
-}

Reply via email to