[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16669790#comment-16669790
 ] 

ASF GitHub Bot commented on FLINK-10166:
----------------------------------------

asfgit closed pull request #6966: [FLINK-10166] [table] Reduce dependencies by 
removing org.apache.commons
URL: https://github.com/apache/flink/pull/6966
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 0d2f835958f..c4769814728 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -1326,7 +1326,7 @@ private int extractMaxIndex(String key, String 
suffixPattern) {
        }
 
        public static String toString(String str) {
-               return EncodingUtils.escapeJava(str).replace("\\/", "/"); // 
'/' must not be escaped
+               return EncodingUtils.escapeJava(str);
        }
 
        public static String toString(String key, String value) {
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 41fa58ef055..47aac25e897 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -26,9 +27,12 @@
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Base64;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * General utilities for string-encoding. This class is used to avoid 
additional dependencies
  * to other projects.
@@ -40,6 +44,8 @@
 
        private static final Base64.Decoder BASE64_DECODER = 
java.util.Base64.getUrlDecoder();
 
+       private static final char[] HEX_CHARS = 
"0123456789abcdef".toCharArray();
+
        private EncodingUtils() {
                // do not instantiate
        }
@@ -47,7 +53,7 @@ private EncodingUtils() {
        public static String encodeObjectToString(Serializable obj) {
                try {
                        final byte[] bytes = 
InstantiationUtil.serializeObject(obj);
-                       return new String(BASE64_ENCODER.encode(bytes), 
StandardCharsets.UTF_8);
+                       return new String(BASE64_ENCODER.encode(bytes), UTF_8);
                } catch (Exception e) {
                        throw new ValidationException(
                                "Unable to serialize object '" + obj.toString() 
+ "' of class '" + obj.getClass().getName() + "'.");
@@ -60,7 +66,7 @@ public static String encodeObjectToString(Serializable obj) {
 
        public static <T extends Serializable> T decodeStringToObject(String 
base64String, Class<T> baseClass, ClassLoader classLoader) {
                try {
-                       final byte[] bytes = 
BASE64_DECODER.decode(base64String.getBytes(StandardCharsets.UTF_8));
+                       final byte[] bytes = 
BASE64_DECODER.decode(base64String.getBytes(UTF_8));
                        final T instance = 
InstantiationUtil.deserializeObject(bytes, classLoader);
                        if (instance != null && 
!baseClass.isAssignableFrom(instance.getClass())) {
                                throw new ValidationException(
@@ -87,10 +93,138 @@ public static String encodeObjectToString(Serializable 
obj) {
                return loadClass(qualifiedName, 
Thread.currentThread().getContextClassLoader());
        }
 
+       public static String encodeStringToBase64(String string) {
+               return new 
String(java.util.Base64.getEncoder().encode(string.getBytes(UTF_8)), UTF_8);
+       }
+
+       public static String decodeBase64ToString(String base64) {
+               return new 
String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+       }
+
+       public static byte[] md5(String string) {
+               try {
+                       return 
MessageDigest.getInstance("MD5").digest(string.getBytes(UTF_8));
+               } catch (NoSuchAlgorithmException e) {
+                       throw new TableException("Unsupported MD5 algorithm.", 
e);
+               }
+       }
+
+       public static String hex(String string) {
+               return hex(string.getBytes(UTF_8));
+       }
+
+       public static String hex(byte[] bytes) {
+               // adopted from https://stackoverflow.com/a/9855338
+               final char[] hexChars = new char[bytes.length * 2];
+               for (int j = 0; j < bytes.length; j++) {
+                       final int v = bytes[j] & 0xFF;
+                       hexChars[j * 2] = HEX_CHARS[v >>> 4];
+                       hexChars[j * 2 + 1] = HEX_CHARS[v & 0x0F];
+               }
+               return new String(hexChars);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Java String Repetition
+       //
+       // copied from o.a.commons.lang3.StringUtils (commons-lang3:3.3.2)
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final String EMPTY = "";
+
+       /**
+        * The maximum size to which the padding constant(s) can expand.
+        */
+       private static final int PAD_LIMIT = 8192;
+
+       /**
+        * Repeat a String {@code repeat} times to form a new String.
+        *
+        * <pre>
+        * StringUtils.repeat(null, 2) = null
+        * StringUtils.repeat("", 0)   = ""
+        * StringUtils.repeat("", 2)   = ""
+        * StringUtils.repeat("a", 3)  = "aaa"
+        * StringUtils.repeat("ab", 2) = "abab"
+        * StringUtils.repeat("a", -2) = ""
+        * </pre>
+        *
+        * @param str    the String to repeat, may be null
+        * @param repeat number of times to repeat str, negative treated as zero
+        * @return a new String consisting of the original String repeated, 
{@code null} if null String input
+        */
+       public static String repeat(final String str, final int repeat) {
+               // Performance tuned for 2.0 (JDK1.4)
+
+               if (str == null) {
+                       return null;
+               }
+               if (repeat <= 0) {
+                       return EMPTY;
+               }
+               final int inputLength = str.length();
+               if (repeat == 1 || inputLength == 0) {
+                       return str;
+               }
+               if (inputLength == 1 && repeat <= PAD_LIMIT) {
+                       return repeat(str.charAt(0), repeat);
+               }
+
+               final int outputLength = inputLength * repeat;
+               switch (inputLength) {
+                       case 1:
+                               return repeat(str.charAt(0), repeat);
+                       case 2:
+                               final char ch0 = str.charAt(0);
+                               final char ch1 = str.charAt(1);
+                               final char[] output2 = new char[outputLength];
+                               for (int i = repeat * 2 - 2; i >= 0; i--, i--) {
+                                       output2[i] = ch0;
+                                       output2[i + 1] = ch1;
+                               }
+                               return new String(output2);
+                       default:
+                               final StringBuilder buf = new 
StringBuilder(outputLength);
+                               for (int i = 0; i < repeat; i++) {
+                                       buf.append(str);
+                               }
+                               return buf.toString();
+               }
+       }
+
+       /**
+        * Returns padding using the specified delimiter repeated to a given 
length.
+        *
+        * <pre>
+        * StringUtils.repeat('e', 0)  = ""
+        * StringUtils.repeat('e', 3)  = "eee"
+        * StringUtils.repeat('e', -2) = ""
+        * </pre>
+        *
+        * <p>Note: this method doesn't not support padding with
+        * <a 
href="http://www.unicode.org/glossary/#supplementary_character";>Unicode 
Supplementary Characters</a>
+        * as they require a pair of {@code char}s to be represented.
+        * If you are needing to support full I18N of your applications
+        * consider using {@link #repeat(String, int)} instead.
+        *
+        * @param ch     character to repeat
+        * @param repeat number of times to repeat char, negative treated as 
zero
+        * @return String with repeated character
+        * @see #repeat(String, int)
+        */
+       public static String repeat(final char ch, final int repeat) {
+               final char[] buf = new char[repeat];
+               for (int i = repeat - 1; i >= 0; i--) {
+                       buf[i] = ch;
+               }
+               return new String(buf);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Java String Escaping
        //
-       // copied from o.a.commons.lang.StringEscapeUtils 
(commons-lang:commons-lang:2.4)
+       // copied from o.a.commons.lang.StringEscapeUtils (commons-lang:2.4)
+       // but without escaping forward slashes.
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -197,10 +331,11 @@ private static void escapeJavaStyleString(Writer out, 
String str, boolean escape
                                                out.write('\\');
                                                out.write('\\');
                                                break;
-                                       case '/':
-                                               out.write('\\');
-                                               out.write('/');
-                                               break;
+                                       // MODIFICATION: Flink removes invalid 
escaping of forward slashes!
+                                       // case '/':
+                                       //      out.write('\\');
+                                       //      out.write('/');
+                                       //      break;
                                        default:
                                                out.write(ch);
                                                break;
diff --git 
a/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
 
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
new file mode 100644
index 00000000000..3bc53d828d4
--- /dev/null
+++ 
b/flink-libraries/flink-table-common/src/test/java/org/apache/flink/table/utils/EncodingUtilsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link org.apache.flink.table.utils.EncodingUtils}.
+ */
+public class EncodingUtilsTest {
+
+       @Test
+       public void testObjectStringEncoding() {
+               final MyPojo pojo = new MyPojo(33, "Hello");
+               final String base64 = EncodingUtils.encodeObjectToString(pojo);
+               assertEquals(pojo, EncodingUtils.decodeStringToObject(base64, 
Serializable.class));
+       }
+
+       @Test
+       public void testStringBase64Encoding() {
+               final String string = "Hello, this is apache flink.";
+               final String base64 = 
EncodingUtils.encodeStringToBase64(string);
+               assertEquals("SGVsbG8sIHRoaXMgaXMgYXBhY2hlIGZsaW5rLg==", 
base64);
+               assertEquals(string, 
EncodingUtils.decodeBase64ToString(base64));
+       }
+
+       @Test
+       public void testMd5Hex() {
+               final String string = "Hello, world! How are you? 高精确";
+               assertEquals("983abac84e994b4ba73be177e5cc298b", 
EncodingUtils.hex(EncodingUtils.md5(string)));
+       }
+
+       @Test
+       public void testJavaEscaping() {
+               assertEquals("\\\\hello\\\"world'space/", 
EncodingUtils.escapeJava("\\hello\"world'space/"));
+       }
+
+       @Test
+       public void testRepetition() {
+               assertEquals("wewewe", EncodingUtils.repeat("we", 3));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class MyPojo implements Serializable {
+
+               private int number;
+               private String string;
+
+               public MyPojo(int number, String string) {
+                       this.number = number;
+                       this.string = string;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       MyPojo myPojo = (MyPojo) o;
+                       return number == myPojo.number && 
Objects.equals(string, myPojo.string);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(number, string);
+               }
+       }
+}
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 14653710b4e..6b2fe8c9af7 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -78,18 +78,6 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
-               <!-- Used for base64 encoding of UDFs -->
-               <dependency>
-                       <groupId>commons-codec</groupId>
-                       <artifactId>commons-codec</artifactId>
-               </dependency>
-
-               <!-- Used for code generation -->
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-lang3</artifactId>
-               </dependency>
-
                <!-- Used for code generation -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -279,12 +267,9 @@ under the License.
                                                                        
<include>com.google.guava:guava</include>
                                                                        
<include>net.hydromatic:*</include>
                                                                        
<include>com.esri.geometry:*</include>
-                                                                       
<include>commons-lang:*</include>
 
                                                                        <!-- 
flink-table dependencies -->
                                                                        
<include>org.apache.flink:flink-table-common</include>
-                                                                       
<include>commons-codec:*</include>
-                                                                       
<include>org.apache.commons:commons-lang3</include>
                                                                        
<include>org.codehaus.janino:*</include>
                                                                        
<include>joda-time:*</include>
                                                                </includes>
@@ -320,12 +305,6 @@ under the License.
                                                                        
<pattern>org.codehaus</pattern>
                                                                        
<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
                                                                </relocation>-->
-
-                                                               <!-- 
commons-codec, commons-lang3, and commons-lang (from Calcite) -->
-                                                               <relocation>
-                                                                       
<pattern>org.apache.commons</pattern>
-                                                                       
<shadedPattern>org.apache.flink.table.shaded.org.apache.commons</shadedPattern>
-                                                               </relocation>
                                                        </relocations>
                                                </configuration>
                                        </execution>
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 11c00080329..566e3d7cbc5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -21,7 +21,6 @@ import java.lang.reflect.Modifier
 import java.lang.{Iterable => JIterable}
 
 import org.apache.calcite.rex.RexLiteral
-import org.apache.commons.codec.binary.Base64
 import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, State, StateDescriptor}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -36,8 +35,8 @@ import 
org.apache.flink.table.functions.aggfunctions.DistinctAccumulator
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod,
 signatureToString}
 import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
SingleElementIterable}
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.Row
-import org.apache.flink.util.InstantiationUtil
 
 import scala.collection.mutable
 
@@ -315,7 +314,7 @@ class AggregationCodeGenerator(
       val dataViewTypeTerm = dataViewField.getType.getCanonicalName
 
       // define the DataView variables
-      val serializedData = serializeStateDescriptor(desc)
+      val serializedData = EncodingUtils.encodeObjectToString(desc)
       val dataViewFieldTerm = createDataViewTerm(aggIndex, 
dataViewField.getName)
       val field =
         s"""
@@ -329,9 +328,10 @@ class AggregationCodeGenerator(
       val descDeserializeCode =
         s"""
            |    $descClassQualifier $descFieldTerm = ($descClassQualifier)
-           |      org.apache.flink.util.InstantiationUtil.deserializeObject(
-           |      
org.apache.commons.codec.binary.Base64.decodeBase64("$serializedData"),
-           |      $contextTerm.getUserCodeClassLoader());
+           |      
${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+           |        "$serializedData",
+           |        $descClassQualifier.class,
+           |        $contextTerm.getUserCodeClassLoader());
            |""".stripMargin
       val createDataView = if (dataViewField.getType == classOf[MapView[_, 
_]]) {
         s"""
@@ -770,10 +770,4 @@ class AggregationCodeGenerator(
 
     GeneratedAggregationsFunction(funcName, funcCode)
   }
-
-  @throws[Exception]
-  def serializeStateDescriptor(stateDescriptor: StateDescriptor[_, _]): String 
= {
-    val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
-    Base64.encodeBase64URLSafeString(byteArray)
-  }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5d047adbc76..13bf50a5ef2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -26,7 +26,6 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable.{ROW, _}
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -40,10 +39,10 @@ import 
org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NU
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, 
FunctionGenerator}
 import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, 
ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.utils.EncodingUtils
 import org.joda.time.format.DateTimeFormatter
 
 import scala.collection.JavaConversions._
@@ -681,7 +680,7 @@ abstract class CodeGenerator(
         generateNonNullLiteral(resultType, decimalField)
 
       case VARCHAR | CHAR =>
-        val escapedValue = 
StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
+        val escapedValue = EncodingUtils.escapeJava(value.toString)
         generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
 
       case SYMBOL =>
@@ -1610,7 +1609,7 @@ abstract class CodeGenerator(
     */
   def addReusableFunction(function: UserDefinedFunction, contextTerm: String = 
null): String = {
     val classQualifier = function.getClass.getCanonicalName
-    val functionSerializedData = UserDefinedFunctionUtils.serialize(function)
+    val functionSerializedData = EncodingUtils.encodeObjectToString(function)
     val fieldTerm = s"function_${function.functionIdentifier}"
 
     val fieldFunction =
@@ -1622,8 +1621,9 @@ abstract class CodeGenerator(
     val functionDeserialization =
       s"""
          |$fieldTerm = ($classQualifier)
-         |${UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")}
-         |.deserialize("$functionSerializedData");
+         |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+         |  "$functionSerializedData",
+         |  ${classOf[UserDefinedFunction].getCanonicalName}.class);
        """.stripMargin
 
     reusableInitStatements.add(functionDeserialization)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
index a68bf8e87cf..791d3883f80 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -23,7 +23,6 @@ import java.util
 
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.commons.lang3.StringEscapeUtils.escapeJava
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.cep.pattern.conditions.IterativeCondition
@@ -33,6 +32,7 @@ import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, ne
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.util.Collector
 import org.apache.flink.util.MathUtils.checkedDownCast
 
@@ -102,7 +102,7 @@ class MatchCodeGenerator(
   private def addReusablePatternNames() : Unit = {
     reusableMemberStatements
       .add(s"private String[] $patternNamesTerm = new String[] { ${
-        patternNames.map(p => s""""${escapeJava(p)}"""").mkString(", ")
+        patternNames.map(p => 
s""""${EncodingUtils.escapeJava(p)}"""").mkString(", ")
       } };")
   }
 
@@ -336,7 +336,7 @@ class MatchCodeGenerator(
          |}
          """.stripMargin
     } else {
-      val escapedPatternName = escapeJava(patternName)
+      val escapedPatternName = EncodingUtils.escapeJava(patternName)
       j"""
          |java.util.List $listName = new java.util.ArrayList();
          |for ($eventTypeTerm $eventNameTerm :
@@ -373,7 +373,7 @@ class MatchCodeGenerator(
          |}
          """.stripMargin
     } else {
-      val escapedPatternName = escapeJava(patternName)
+      val escapedPatternName = EncodingUtils.escapeJava(patternName)
       j"""
          |java.util.List $listName = (java.util.List) 
$input1Term.get("$escapedPatternName");
          |if ($listName == null) {
@@ -427,7 +427,7 @@ class MatchCodeGenerator(
   }
 
   private def generatePatternFieldRef(fieldRef: RexPatternFieldRef): 
GeneratedExpression = {
-    val escapedAlpha = escapeJava(fieldRef.getAlpha)
+    val escapedAlpha = EncodingUtils.escapeJava(fieldRef.getAlpha)
     val patternVariableRef = reusableInputUnboxingExprs
       .get((s"$escapedAlpha#$first", offset)) match {
       // input access and unboxing has already been generated
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
index d3be6e1b852..1ce8af8c528 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/HashCalcCallGen.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.codec.Charsets
-import org.apache.commons.codec.binary.Hex
+import java.nio.charset.StandardCharsets
+
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.table.codegen.CodeGenUtils.newName
 import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallWithStmtIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.utils.EncodingUtils
 
 class HashCalcCallGen(algName: String) extends CallGenerator {
 
@@ -68,9 +69,10 @@ class HashCalcCallGen(algName: String) extends CallGenerator 
{
         val auxiliaryStmt =
           s"""
             |${initStmt.getOrElse("")}
-            
|$md.update(${terms.head}.getBytes(${classOf[Charsets].getCanonicalName}.UTF_8));
+            |$md.update(${terms.head}
+            |  .getBytes(${classOf[StandardCharsets].getCanonicalName}.UTF_8));
             |""".stripMargin
-        val result = 
s"${classOf[Hex].getCanonicalName}.encodeHexString($md.digest())"
+        val result = 
s"${classOf[EncodingUtils].getCanonicalName}.hex($md.digest())"
         (Some(auxiliaryStmt), result)
     }
   }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index df206de6cad..b91ab5ad77f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
@@ -68,7 +67,7 @@ class ScalarFunctionCallGen(
     val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
-          } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+          } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
               && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
             // we use primitives to represent temporal types internally, so no 
casting needed here
             val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index e1ad18f988f..6bcfc6e2d61 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.commons.lang3.ClassUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
@@ -68,7 +67,7 @@ class TableFunctionCallGen(
     val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
-          } else if (ClassUtils.isPrimitiveWrapper(paramClass)
+          } else if (TypeCheckUtils.isPrimitiveWrapper(paramClass)
               && TypeCheckUtils.isTemporal(operandExpr.resultType)) {
             // we use primitives to represent temporal types internally, so no 
casting needed here
             val exprOrNull: String = if (codeGenerator.nullCheck) {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 419c12567f1..24bae90fa10 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -17,6 +17,9 @@
  */
 package org.apache.flink.table.expressions
 
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.SqlIntervalQualifier
@@ -27,10 +30,6 @@ import org.apache.calcite.util.{DateString, TimeString, 
TimestampString}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import java.sql.{Date, Time, Timestamp}
-import java.util.{Calendar, TimeZone}
-
-import org.apache.commons.lang3.StringEscapeUtils
 
 object Literal {
   private[flink] val UTC = TimeZone.getTimeZone("UTC")
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index 15bcb171a1f..89ba0d4f364 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.table.functions
 
-import org.apache.commons.codec.digest.DigestUtils
-import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.serialize
+import org.apache.flink.table.utils.EncodingUtils
+
 /**
   * Base class for all user-defined functions such as scalar functions, table 
functions,
   * or aggregation functions.
@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
   def isDeterministic: Boolean = true
 
   final def functionIdentifier: String = {
-    val md5 = DigestUtils.md5Hex(serialize(this))
+    val md5 = 
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
   }
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 9799e4d9f7f..c9a27036797 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,28 +19,27 @@
 
 package org.apache.flink.table.functions.utils
 
-import java.util
-import java.lang.{Integer => JInt, Long => JLong}
 import java.lang.reflect.{Method, Modifier}
+import java.lang.{Integer => JInt, Long => JLong}
 import java.sql.{Date, Time, Timestamp}
+import java.util
 
-import org.apache.commons.codec.binary.Base64
 import com.google.common.primitives.Primitives
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
 import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, 
SqlOperandCountRange, SqlOperator, SqlOperatorBinding}
+import org.apache.calcite.sql.{SqlCallBinding, SqlFunction, 
SqlOperandCountRange, SqlOperator}
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, 
TypeExtractor}
 import org.apache.flink.table.api.dataview._
-import org.apache.flink.table.dataview._
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataview._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
 
@@ -734,19 +733,6 @@ object UserDefinedFunctionUtils {
       (candidate.getComponentType == expected.getComponentType ||
         expected.getComponentType == classOf[Object]))
 
-  @throws[Exception]
-  def serialize(function: UserDefinedFunction): String = {
-    val byteArray = InstantiationUtil.serializeObject(function)
-    Base64.encodeBase64URLSafeString(byteArray)
-  }
-
-  @throws[Exception]
-  def deserialize(data: String): UserDefinedFunction = {
-    val byteData = Base64.decodeBase64(data)
-    InstantiationUtil
-      .deserializeObject[UserDefinedFunction](byteData, 
Thread.currentThread.getContextClassLoader)
-  }
-
   /**
     * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
     *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
index fdf45e79af5..097346ab613 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/TreeNode.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.plan
 
-import org.apache.commons.lang.ClassUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 /**
  * Generic base class for trees that can be transformed and traversed.
@@ -88,28 +88,28 @@ abstract class TreeNode[A <: TreeNode[A]] extends Product { 
self: A =>
    * if children change.
    */
   private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
-    val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
+    val ctors = getClass.getConstructors.filter(_.getParameterTypes.length > 0)
     if (ctors.isEmpty) {
       throw new RuntimeException(s"No valid constructor for 
${getClass.getSimpleName}")
     }
 
     val defaultCtor = ctors.find { ctor =>
-      if (ctor.getParameterTypes.size != newArgs.length) {
+      if (ctor.getParameterTypes.length != newArgs.length) {
         false
       } else if (newArgs.contains(null)) {
         false
       } else {
         val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
-        ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
+        TypeCheckUtils.isAssignable(argsClasses, ctor.getParameterTypes)
       }
-    }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
+    }.getOrElse(ctors.maxBy(_.getParameterTypes.length))
 
     try {
       defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
     } catch {
       case e: Throwable =>
         throw new RuntimeException(
-          s"Fail to copy treeNode ${getClass.getName}: 
${e.getStackTraceString}")
+          s"Fail to copy tree node ${getClass.getName}.", e)
     }
   }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index da7cf64b13e..a2acb0cecc7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -19,12 +19,9 @@ package org.apache.flink.table.runtime.functions
 
 import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.nio.charset.StandardCharsets
-import java.util.regex.Matcher
-import java.util.regex.Pattern
+import java.util.regex.{Matcher, Pattern}
 
-import org.apache.commons.codec.binary.{Base64, Hex}
-import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.utils.EncodingUtils
 
 import scala.annotation.varargs
 
@@ -269,25 +266,25 @@ object ScalarFunctions {
   /**
     * Returns the base string decoded with base64.
     */
-  def fromBase64(str: String): String =
-    new String(Base64.decodeBase64(str), StandardCharsets.UTF_8)
+  def fromBase64(base64: String): String =
+    EncodingUtils.decodeBase64ToString(base64)
 
   /**
     * Returns the base64-encoded result of the input string.
     */
-  def toBase64(base: String): String =
-    Base64.encodeBase64String(base.getBytes(StandardCharsets.UTF_8))
+  def toBase64(string: String): String =
+    EncodingUtils.encodeStringToBase64(string)
 
   /**
     * Returns the hex string of a long argument.
     */
-  def hex(x: Long): String = JLong.toHexString(x).toUpperCase()
+  def hex(string: Long): String = JLong.toHexString(string).toUpperCase()
 
   /**
     * Returns the hex string of a string argument.
     */
-  def hex(x: String): String =
-    Hex.encodeHexString(x.getBytes(StandardCharsets.UTF_8)).toUpperCase()
+  def hex(string: String): String =
+    EncodingUtils.hex(string).toUpperCase()
 
   /**
     * Returns an UUID string using Java utilities.
@@ -297,6 +294,6 @@ object ScalarFunctions {
   /**
     * Returns a string that repeats the base string n times.
     */
-  def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+  def repeat(base: String, n: Int): String = EncodingUtils.repeat(base, n)
 
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 02af79843a4..25367ad4dd8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -183,4 +183,95 @@ object TypeCheckUtils {
       }
     }
   }
+
+  /**
+    * Checks if a class is a Java primitive wrapper.
+    */
+  def isPrimitiveWrapper(clazz: Class[_]): Boolean = {
+    clazz == classOf[java.lang.Boolean] ||
+    clazz == classOf[java.lang.Byte] ||
+    clazz == classOf[java.lang.Character] ||
+    clazz == classOf[java.lang.Short] ||
+    clazz == classOf[java.lang.Integer] ||
+    clazz == classOf[java.lang.Long] ||
+    clazz == classOf[java.lang.Double] ||
+    clazz == classOf[java.lang.Float]
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class[], 
java.lang.Class[])
+    * but without null checks.
+    */
+  def isAssignable(classArray: Array[Class[_]], toClassArray: 
Array[Class[_]]): Boolean = {
+    if (classArray.length != toClassArray.length) {
+      return false
+    }
+    var i = 0
+    while (i < classArray.length) {
+      if (!isAssignable(classArray(i), toClassArray(i))) {
+        return false
+      }
+      i += 1
+    }
+    true
+  }
+
+  /**
+    * Checks if one class can be assigned to a variable of another class.
+    *
+    * Adopted from o.a.commons.lang.ClassUtils#isAssignable(java.lang.Class, 
java.lang.Class) but
+    * without null checks.
+    */
+  def isAssignable(cls: Class[_], toClass: Class[_]): Boolean = {
+    if (cls.equals(toClass)) {
+      return true
+    }
+    if (cls.isPrimitive) {
+      if (!toClass.isPrimitive) {
+        return false
+      }
+      if (java.lang.Integer.TYPE.equals(cls)) {
+        return java.lang.Long.TYPE.equals(toClass) ||
+          java.lang.Float.TYPE.equals(toClass) ||
+          java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Long.TYPE.equals(cls)) {
+        return java.lang.Float.TYPE.equals(toClass) ||
+          java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Boolean.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Double.TYPE.equals(cls)) {
+        return false
+      }
+      if (java.lang.Float.TYPE.equals(cls)) {
+          return java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Character.TYPE.equals(cls)) {
+          return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Short.TYPE.equals(cls)) {
+          return java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      if (java.lang.Byte.TYPE.equals(cls)) {
+          return java.lang.Short.TYPE.equals(toClass) ||
+            java.lang.Integer.TYPE.equals(toClass) ||
+            java.lang.Long.TYPE.equals(toClass) ||
+            java.lang.Float.TYPE.equals(toClass) ||
+            java.lang.Double.TYPE.equals(toClass)
+      }
+      // should never get here
+      return false
+    }
+    toClass.isAssignableFrom(cls)
+  }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 28b7d14ac33..f70d991e50b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -30,12 +30,12 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
 import 
org.apache.flink.table.functions.aggfunctions.{IntSumWithRetractAggFunction, 
LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.utils.EncodingUtils
 import org.junit.Rule
 import org.junit.rules.ExpectedException
 
@@ -47,13 +47,13 @@ class HarnessTestBase {
   def thrown = expectedException
 
   val longMinWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new LongMinWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new LongMinWithRetractAggFunction)
 
   val longMaxWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new LongMaxWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new LongMaxWithRetractAggFunction)
 
   val intSumWithRetractAggFunction: String =
-    UserDefinedFunctionUtils.serialize(new IntSumWithRetractAggFunction)
+    EncodingUtils.encodeObjectToString(new IntSumWithRetractAggFunction)
 
   protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
     LONG_TYPE_INFO,
@@ -97,12 +97,14 @@ class HarnessTestBase {
       |  public MinMaxAggregateHelper() throws Exception {
       |
       |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    .deserialize("$longMinWithRetractAggFunction");
+      |    ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |      "$longMinWithRetractAggFunction",
+      |      ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |
       |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    .deserialize("$longMaxWithRetractAggFunction");
+      |    ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |      "$longMaxWithRetractAggFunction",
+      |      ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |  }
       |
       |  public void setAggregationResults(
@@ -220,8 +222,9 @@ class HarnessTestBase {
       |  public SumAggregationHelper() throws Exception {
       |
       |sum = 
(org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction)
-      |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |.deserialize("$intSumWithRetractAggFunction");
+      |${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject(
+      |  "$intSumWithRetractAggFunction",
+      |  ${classOf[UserDefinedFunction].getCanonicalName}.class);
       |}
       |
       |  public final void setAggregationResults(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
index 645e6089bd0..8f5ff545e88 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.{Types => ScalaTypes}
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.Test
 
 class TypeCheckUtilsTest {
@@ -51,4 +52,24 @@ class TypeCheckUtilsTest {
   def testInvalidType3(): Unit = {
     validateEqualsHashCode("", Types.OBJECT_ARRAY[Nothing](ScalaTypes.NOTHING))
   }
+
+  @Test
+  def testPrimitiveWrapper (): Unit = {
+    assertTrue(TypeCheckUtils.isPrimitiveWrapper(classOf[java.lang.Double]))
+    assertFalse(TypeCheckUtils.isPrimitiveWrapper(classOf[Double]))
+  }
+
+  @Test
+  def testAssignability(): Unit = {
+    assertTrue(TypeCheckUtils.isAssignable(classOf[Double], classOf[Double]))
+    assertFalse(TypeCheckUtils.isAssignable(classOf[Boolean], classOf[Double]))
+    assertTrue(TypeCheckUtils.isAssignable(
+      classOf[java.util.HashMap[_, _]], classOf[java.util.Map[_, _]]))
+    assertFalse(TypeCheckUtils.isAssignable(
+      classOf[java.util.Map[_, _]], classOf[java.util.HashMap[_, _]]))
+
+    assertTrue(TypeCheckUtils.isAssignable(
+      Array[Class[_]](classOf[Double], classOf[Double]),
+      Array[Class[_]](classOf[Double], classOf[Double])))
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Dependency problems when executing SQL query in sql-client
> ----------------------------------------------------------
>
>                 Key: FLINK-10166
>                 URL: https://issues.apache.org/jira/browse/FLINK-10166
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.6.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Timo Walther
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to