This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ab1d1dfb2ad872748833896d552e2d56a26a9a92 Author: xueyu <278006...@qq.com> AuthorDate: Sun Jul 15 20:01:15 2018 +0800 [FLINK-9853] [table] Add HEX support for Table API & SQL This closes #6337. --- docs/dev/table/functions.md | 39 +++++++++ .../flink/table/api/scala/expressionDsl.scala | 9 +++ .../flink/table/codegen/calls/BuiltInMethods.scala | 4 + .../table/codegen/calls/FunctionGenerator.scala | 12 +++ .../flink/table/expressions/mathExpressions.scala | 18 +++++ .../table/functions/sql/ScalarSqlFunctions.scala | 9 +++ .../table/runtime/functions/ScalarFunctions.scala | 16 +++- .../flink/table/validate/FunctionCatalog.scala | 2 + .../table/expressions/ScalarFunctionsTest.scala | 93 ++++++++++++++++++++++ 9 files changed, 199 insertions(+), 3 deletions(-) diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 00a9605..f637b4d 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -1393,6 +1393,19 @@ BIN(integer) <p>E.g. <code>BIN(4)</code> returns '100' and <code>BIN(12)</code> returns '1100'.</p> </td> </tr> + + <tr> + <td> +{% highlight text %} +HEX(numeric) +HEX(string) + {% endhighlight %} + </td> + <td> + <p>Returns a string representation of an integer <i>numeric</i> value or a <i>string</i> in hex format. Returns NULL if the argument is NULL.</p> + <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p> + </td> + </tr> </tbody> </table> </div> @@ -1805,6 +1818,19 @@ INTEGER.bin() <p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p> </td> </tr> + + <tr> + <td> + {% highlight java %} +NUMERIC.hex() +STRING.hex() +{% endhighlight %} + </td> + <td> + <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p> + <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p> + </td> + </tr> </tbody> </table> </div> @@ -2217,6 +2243,19 @@ INTEGER.bin() <p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p> </td> </tr> + + <tr> + <td> + {% highlight scala %} +NUMERIC.hex() +STRING.hex() +{% endhighlight %} + </td> + <td> + <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p> + <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p> + </td> + </tr> </tbody> </table> </div> diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index d1bb06c..66e7544 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -406,6 +406,15 @@ trait ImplicitExpressionOperations { */ def bin() = Bin(expr) + /** + * Returns a string representation of an integer numeric value or a string in hex format. Returns + * null if numeric or string is null. + * + * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads + * to "68656c6c6f2c776f726c64". + */ + def hex() = Hex(expr) + // String operations /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index f5ed9b3..942666a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.codegen.calls +import java.lang.reflect.Method import java.lang.{Long => JLong} import java.math.{BigDecimal => JBigDecimal} @@ -135,4 +136,7 @@ object BuiltInMethods { val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String]) val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String]) + + val HEX_LONG: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[Long]) + val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 74b69d6..fd71126 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -455,6 +455,18 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethods.BIN) + addSqlFunctionMethod( + ScalarSqlFunctions.HEX, + Seq(LONG_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethods.HEX_LONG) + + addSqlFunctionMethod( + ScalarSqlFunctions.HEX, + Seq(STRING_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethods.HEX_STRING) + // ---------------------------------------------------------------------------------------------- // Temporal functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala index cf3efa9..13e005e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala @@ -422,3 +422,21 @@ case class Bin(child: Expression) extends UnaryExpression { relBuilder.call(ScalarSqlFunctions.BIN, child.toRexNode) } } + +case class Hex(child: Expression) extends UnaryExpression { + override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO + + override private[flink] def validateInput(): ValidationResult = { + if (TypeCheckUtils.isIntegerFamily(child.resultType) || + TypeCheckUtils.isString(child.resultType)) { + ValidationSuccess + } else { + ValidationFailure(s"hex() requires an integer or string input but was '${child.resultType}'.") + } + } + override def toString: String = s"hex($child)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(ScalarSqlFunctions.HEX, child.toRexNode) + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala index 21793e3..a0b6c9c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala @@ -42,6 +42,15 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.INTEGER), SqlFunctionCategory.NUMERIC) + val HEX = new SqlFunction( + "HEX", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE), + InferTypes.RETURN_TYPE, + OperandTypes.or(OperandTypes.family(SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.STRING)), + SqlFunctionCategory.NUMERIC) + val CONCAT = new SqlFunction( "CONCAT", SqlKind.OTHER_FUNCTION, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala index 03ee62c..1881874 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala @@ -17,11 +17,12 @@ */ package org.apache.flink.table.runtime.functions -import scala.annotation.varargs +import java.lang.{StringBuilder, Long => JLong} import java.math.{BigDecimal => JBigDecimal} -import java.lang.StringBuilder -import org.apache.commons.codec.binary.Base64 +import org.apache.commons.codec.binary.{Base64, Hex} + +import scala.annotation.varargs /** * Built-in scalar runtime functions. @@ -212,4 +213,13 @@ object ScalarFunctions { */ def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes()) + /** + * Returns the hex string of a long argument. + */ + def hex(x: Long): String = JLong.toHexString(x).toUpperCase() + + /** + * Returns the hex string of a string argument. + */ + def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase() } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index bca156c..a446401 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -239,6 +239,7 @@ object FunctionCatalog { "rand" -> classOf[Rand], "randInteger" -> classOf[RandInteger], "bin" -> classOf[Bin], + "hex" -> classOf[Hex], // temporal functions "extract" -> classOf[Extract], @@ -438,6 +439,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { ScalarSqlFunctions.CONCAT, ScalarSqlFunctions.CONCAT_WS, ScalarSqlFunctions.BIN, + ScalarSqlFunctions.HEX, SqlStdOperatorTable.TIMESTAMP_ADD, ScalarSqlFunctions.LOG, ScalarSqlFunctions.LPAD, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 6f9a9ae..8e85b34 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -393,6 +393,99 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { } @Test + def testHex(): Unit = { + testAllApis( + 100.hex(), + "100.hex()", + "HEX(100)", + "64") + + testAllApis( + 'f2.hex(), + "f2.hex()", + "HEX(f2)", + "2A") + + testAllApis( + Null(Types.BYTE).hex(), + "hex(Null(BYTE))", + "HEX(CAST(NULL AS TINYINT))", + "null") + + testAllApis( + 'f3.hex(), + "f3.hex()", + "HEX(f3)", + "2B") + + testAllApis( + 'f4.hex(), + "f4.hex()", + "HEX(f4)", + "2C") + + testAllApis( + 'f7.hex(), + "f7.hex()", + "HEX(f7)", + "3") + + testAllApis( + 12.hex(), + "12.hex()", + "HEX(12)", + "C") + + testAllApis( + 10.hex(), + "10.hex()", + "HEX(10)", + "A") + + testAllApis( + 0.hex(), + "0.hex()", + "HEX(0)", + "0") + + testAllApis( + "ö".hex(), + "'ö'.hex()", + "HEX('ö')", + "C3B6") + + testAllApis( + 'f32.hex(), + "f32.hex()", + "HEX(f32)", + "FFFFFFFFFFFFFFFF") + + testAllApis( + 'f0.hex(), + "f0.hex()", + "HEX(f0)", + "546869732069732061207465737420537472696E672E") + + testAllApis( + 'f8.hex(), + "f8.hex()", + "HEX(f8)", + "20546869732069732061207465737420537472696E672E20") + + testAllApis( + 'f23.hex(), + "f23.hex()", + "HEX(f23)", + "25546869732069732061207465737420537472696E672E") + + testAllApis( + 'f24.hex(), + "f24.hex()", + "HEX(f24)", + "2A5F546869732069732061207465737420537472696E672E") + } + + @Test def testBin(): Unit = { testAllApis(