This is an automated email from the ASF dual-hosted git repository. xccui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 724857c [FLINK-9916] Add FROM_BASE64 function for table/sql API 724857c is described below commit 724857c3636e923037ea02131b05e03a8c7ca520 Author: yanghua <yanghua1...@gmail.com> AuthorDate: Mon Jul 23 23:38:23 2018 +0800 [FLINK-9916] Add FROM_BASE64 function for table/sql API This closes #6397. --- docs/dev/table/sql.md | 10 ++++++++ docs/dev/table/tableApi.md | 11 +++++++++ .../flink/table/api/scala/expressionDsl.scala | 5 ++++ .../flink/table/codegen/calls/BuiltInMethods.scala | 2 ++ .../table/codegen/calls/FunctionGenerator.scala | 6 +++++ .../table/expressions/stringExpressions.scala | 28 +++++++++++++++++++++- .../table/functions/sql/ScalarSqlFunctions.scala | 11 +++++++++ .../table/runtime/functions/ScalarFunctions.scala | 8 +++++++ .../flink/table/validate/FunctionCatalog.scala | 2 ++ .../table/expressions/ScalarFunctionsTest.scala | 22 +++++++++++++++++ .../expressions/utils/ScalarTypesTestBase.scala | 6 +++-- 11 files changed, 108 insertions(+), 3 deletions(-) diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 5b83e9d..1ed06f0 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1828,6 +1828,16 @@ RPAD(text string, len integer, pad string) <p>Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code>, <code>RPAD('hi',1,'??')</code> returns <code>h</code>.</p> </td> </tr> + <tr> + <td> + {% highlight text %} +FROM_BASE64(text string) +{% endhighlight %} + </td> + <td> + <p>Returns the base string decoded with base64, if text is NULL, returns NULL. E.g. <code>FROM_BASE64('aGVsbG8gd29ybGQ=')</code> returns <code>hello world</code>.</p> + </td> + </tr> </tbody> </table> diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 2f65145..b1b8f60 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -2463,6 +2463,17 @@ STRING.rpad(len INT, pad STRING) <p>Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".rpad(4, '??') returns "hi??", "hi".rpad(1, '??') returns "h".</p> </td> </tr> + <tr> + <td> + {% highlight java %} +STRING.fromBase64() +{% endhighlight %} + </td> + + <td> + <p>Returns the base string decoded with base64, if string is null, returns null. E.g. "aGVsbG8gd29ybGQ=".fromBase64() returns "hello world".</p> + </td> + </tr> <tr> <td> diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index b0bc5b6..62c62b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -539,6 +539,11 @@ trait ImplicitExpressionOperations { def overlay(newString: Expression, starting: Expression, length: Expression) = Overlay(expr, newString, starting, length) + /** + * Returns the base string decoded with base64. + */ + def fromBase64() = FromBase64(expr) + // Temporal operations /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index ea4f0fd..22298da 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -110,4 +110,6 @@ object BuiltInMethods { classOf[String]) val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long]) + + val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index bd75617..d264cce 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -146,6 +146,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethod.OVERLAY.method) + addSqlFunctionMethod( + FROM_BASE64, + Seq(STRING_TYPE_INFO), + STRING_TYPE_INFO, + BuiltInMethods.FROMBASE64) + // ---------------------------------------------------------------------------------------------- // Arithmetic functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index e69485f..87d251d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala @@ -61,7 +61,7 @@ case class InitCap(child: Expression) extends UnaryExpression { if (child.resultType == STRING_TYPE_INFO) { ValidationSuccess } else { - ValidationFailure(s"InitCap operator requires String input, " + + ValidationFailure(s"InitCap operator requires String input, " + s"but $child is of type ${child.resultType}") } } @@ -357,3 +357,29 @@ case class Rpad(text: Expression, len: Expression, pad: Expression) relBuilder.call(ScalarSqlFunctions.RPAD, children.map(_.toRexNode)) } } + +/** + * Returns the base string decoded with base64. + * Returns NULL If the input string is NULL. + */ +case class FromBase64(child: Expression) extends UnaryExpression with InputTypeSpec { + + override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO) + + override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override private[flink] def validateInput(): ValidationResult = { + if (child.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"FromBase64 operator requires String input, " + + s"but $child is of type ${child.resultType}") + } + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(ScalarSqlFunctions.FROM_BASE64, child.toRexNode) + } + + override def toString: String = s"($child).fromBase64" +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala index 69f430b..1af1e68 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala @@ -157,4 +157,15 @@ object ScalarSqlFunctions { OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING), SqlFunctionCategory.TIMEDATE ) + + val FROM_BASE64 = new SqlFunction( + "FROM_BASE64", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE), + InferTypes.RETURN_TYPE, + OperandTypes.family(SqlTypeFamily.STRING), + SqlFunctionCategory.STRING + ) + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala index 2e7c9f6..40f1ec3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala @@ -21,6 +21,8 @@ import scala.annotation.varargs import java.math.{BigDecimal => JBigDecimal} import java.lang.StringBuilder +import org.apache.commons.codec.binary.Base64 + /** * Built-in scalar runtime functions. */ @@ -182,4 +184,10 @@ object ScalarFunctions { new String(data) } + + /** + * Returns the base string decoded with base64. + */ + def fromBase64(str: String): String = new String(Base64.decodeBase64(str)) + } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 3184e00..b4f0424 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -202,6 +202,7 @@ object FunctionCatalog { "concat_ws" -> classOf[ConcatWs], "lpad" -> classOf[Lpad], "rpad" -> classOf[Rpad], + "fromBase64" -> classOf[FromBase64], // math functions "plus" -> classOf[Plus], @@ -443,6 +444,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { ScalarSqlFunctions.SHA384, ScalarSqlFunctions.SHA512, ScalarSqlFunctions.SHA2, + ScalarSqlFunctions.FROM_BASE64, // EXTENSIONS BasicOperatorTable.TUMBLE, BasicOperatorTable.HOP, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 8b0c380..995762a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -450,6 +450,28 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "1111111111111111111111111111111111111111111111111111111111111111") } + @Test + def testFromBase64(): Unit = { + testAllApis( + 'f35.fromBase64(), + "f35.fromBase64()", + "from_base64(f35)", + "hello world") + + testAllApis( + 'f35.fromBase64(), + "f35.fromBase64()", + "FROM_BASE64(f35)", + "hello world") + + //null test + testAllApis( + 'f33.fromBase64(), + "f33.fromBase64()", + "FROM_BASE64(f33)", + "null") + } + // ---------------------------------------------------------------------------------------------- // Math functions // ---------------------------------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala index 2a90a90..6ad59b1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala @@ -28,7 +28,7 @@ import org.apache.flink.types.Row class ScalarTypesTestBase extends ExpressionTestBase { def testData: Row = { - val testData = new Row(35) + val testData = new Row(36) testData.setField(0, "This is a test String.") testData.setField(1, true) testData.setField(2, 42.toByte) @@ -64,6 +64,7 @@ class ScalarTypesTestBase extends ExpressionTestBase { testData.setField(32, -1) testData.setField(33, null) testData.setField(34, 256) + testData.setField(35, "aGVsbG8gd29ybGQ=") testData } @@ -103,6 +104,7 @@ class ScalarTypesTestBase extends ExpressionTestBase { Types.DECIMAL, Types.INT, Types.STRING, - Types.INT).asInstanceOf[TypeInformation[Any]] + Types.INT, + Types.STRING).asInstanceOf[TypeInformation[Any]] } }