matriv commented on a change in pull request #17658:
URL: https://github.com/apache/flink/pull/17658#discussion_r741950725
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java
##########
@@ -148,9 +147,10 @@ protected AbstractCodeGeneratorCastRule(CastRulePredicate
predicate) {
classCode)
.getConstructors()[0]
.newInstance(constructorArgs);
- } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ } catch (Throwable e) {
Review comment:
If possible, would be better to keep listing all possible exceptions.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
##########
@@ -57,41 +57,37 @@ public CastCodeBlock generateCodeBlock(
String inputIsNullTerm,
LogicalType inputType,
LogicalType targetType) {
- StringBuilder resultCode = new StringBuilder();
+ CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
Review comment:
could also be final
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
Review comment:
since this is now unit testing, maybe we could test both boolean values.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -76,7 +155,77 @@
Thread.currentThread().getContextClassLoader()),
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
- StringData.fromString("2021-09-24
14:34:56.123456")),
+ StringData.fromString("2021-09-24
14:34:56.123456"))
Review comment:
Maybe you could extract those dates/time/timestamps to static vars,
similar to `CastFunctionITCase` so we can always have those default tests, and
then use some other date/time values if needed to test some other cases, or
edge cases.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
Review comment:
this is duplicate
##########
File path:
flink-table/flink-table-planner/src/test/resources/log4j2-test.properties
##########
@@ -26,3 +26,9 @@ appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+# Uncomment to enable codegen logging
Review comment:
nice!
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -143,9 +308,20 @@ public void run() throws Exception {
this.castContext,
this.inputType.getLogicalType(),
this.targetType.getLogicalType());
- assertNotNull(executor);
+ assertNotNull(
+ executor,
+ "Cannot resolve an executor for input "
+ + this.inputType
+ + " and target "
+ + this.targetType);
assertEquals(this.expectedData, executor.cast(this.inputData));
+
+ // Run twice to make sure rules are reusable without causing issues
+ assertEquals(
Review comment:
nice!
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class MapToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final MapToStringCastRule INSTANCE = new
MapToStringCastRule();
+
+ private MapToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.MAP)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getKeyType(), target)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getValueType(), target))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType keyType = ((MapType) inputLogicalType).getKeyType();
+ final LogicalType valueType = ((MapType)
inputLogicalType).getValueType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ final String keyArrayTerm = newName("keys");
+ final String valueArrayTerm = newName("values");
+
+ return new CastRuleUtils.CodeWriter()
Review comment:
Could you maybe add a sample generated code for this?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
Review comment:
Also here, some generated code example would be nice.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
+ .fromCase(
+ BINARY(2), new byte[] {0, 1},
StringData.fromString("\u0000\u0001"))
+ .fromCase(
+ VARBINARY(3),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ VARBINARY(5),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+ .fromCase(
+ DECIMAL(4, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 4, 3),
+ StringData.fromString("9.870"))
+ .fromCase(
+ DECIMAL(5, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 5, 3),
+ StringData.fromString("9.870"))
+ .fromCase(TINYINT(), (byte) -125,
StringData.fromString("-125"))
+ .fromCase(SMALLINT(), (short) 32767,
StringData.fromString("32767"))
+ .fromCase(INT(), -12345678,
StringData.fromString("-12345678"))
+ .fromCase(BIGINT(), 1234567891234L,
StringData.fromString("1234567891234"))
+ .fromCase(FLOAT(), -123.456f,
StringData.fromString("-123.456"))
Review comment:
For float and double maybe check some longer values that will end up
with the scientific representation.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
Review comment:
Also a sample generated code if you could please.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
##########
@@ -967,10 +969,20 @@ object ScalarOperatorGens {
inputType,
targetType
)
+
+ val castCode = if (castCodeBlock.getCode.isEmpty) {
+ ""
+ } else {
+ s"\n" +
Review comment:
Maybe worths keeping the comments anyway, to be even more clear that
there is no code generated?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ /* Example generated code for ARRAY<INT>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("[");
+ for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
+ if (i$2 != 0) {
+ builder$1.append(", ");
+ }
+ int element$3 = -1;
+ boolean elementIsNull$4 = _myInput.isNullAt(i$2);
+ if (!elementIsNull$4) {
+ element$3 = _myInput.getInt(i$2);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("]");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType innerInputType = ((ArrayType)
inputLogicalType).getElementType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ return new CastRuleUtils.CodeWriter()
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
Review comment:
`setLength(0)` could be used instead.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ /* Example generated code for ARRAY<INT>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("[");
+ for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
+ if (i$2 != 0) {
+ builder$1.append(", ");
+ }
+ int element$3 = -1;
+ boolean elementIsNull$4 = _myInput.isNullAt(i$2);
+ if (!elementIsNull$4) {
+ element$3 = _myInput.getInt(i$2);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("]");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType innerInputType = ((ArrayType)
inputLogicalType).getElementType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ return new CastRuleUtils.CodeWriter()
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
Review comment:
`setLength(0)` could be used instead. I see that delete makes an array
copy, where the setLength(0) just "truncates" the buffer.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
+ .fromCase(
+ BINARY(2), new byte[] {0, 1},
StringData.fromString("\u0000\u0001"))
+ .fromCase(
+ VARBINARY(3),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ VARBINARY(5),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+ .fromCase(
+ DECIMAL(4, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 4, 3),
+ StringData.fromString("9.870"))
+ .fromCase(
+ DECIMAL(5, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 5, 3),
+ StringData.fromString("9.870"))
+ .fromCase(TINYINT(), (byte) -125,
StringData.fromString("-125"))
+ .fromCase(SMALLINT(), (short) 32767,
StringData.fromString("32767"))
+ .fromCase(INT(), -12345678,
StringData.fromString("-12345678"))
+ .fromCase(BIGINT(), 1234567891234L,
StringData.fromString("1234567891234"))
+ .fromCase(FLOAT(), -123.456f,
StringData.fromString("-123.456"))
Review comment:
not addressed ? or I'm missing something?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
Review comment:
Is there something missing here? I don't see a for loop.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
Review comment:
ah, The loop is happening in `generateCodeBlockInternal`, please ignore.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class MapToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final MapToStringCastRule INSTANCE = new
MapToStringCastRule();
+
+ private MapToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.MAP)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getKeyType(), target)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getValueType(), target))
+ .build());
+ }
+
+ /* Example generated code for MAP<STRING, INTERVAL MONTH>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ org.apache.flink.table.data.ArrayData keys$2 = _myInput.keyArray();
+ org.apache.flink.table.data.ArrayData values$3 = _myInput.valueArray();
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("{");
+ for (int i$4 = 0; i$4 < _myInput.size(); i$4++) {
+ if (i$4 != 0) {
+ builder$1.append(", ");
+ }
+ org.apache.flink.table.data.binary.BinaryStringData key$5 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ boolean keyIsNull$6 = keys$2.isNullAt(i$4);
+ int value$7 = -1;
+ boolean valueIsNull$8 = values$3.isNullAt(i$4);
+ if (!keyIsNull$6) {
+ key$5 = ((org.apache.flink.table.data.binary.BinaryStringData)
keys$2.getString(i$4));
+ builder$1.append(key$5);
+ } else {
+ builder$1.append("NULL");
+ }
+ builder$1.append("=");
+ if (!valueIsNull$8) {
+ value$7 = values$3.getInt(i$4);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.utils.DateTimeUtils.intervalYearMonthToString(value$7));
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("}");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType keyType = ((MapType) inputLogicalType).getKeyType();
+ final LogicalType valueType = ((MapType)
inputLogicalType).getValueType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ final String keyArrayTerm = newName("keys");
+ final String valueArrayTerm = newName("values");
+
+ return new CastRuleUtils.CodeWriter()
+ .declStmt(ArrayData.class, keyArrayTerm, methodCall(inputTerm,
"keyArray"))
+ .declStmt(ArrayData.class, valueArrayTerm,
methodCall(inputTerm, "valueArray"))
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
+ .stmt(methodCall(builderTerm, "append", strLiteral("{")))
+ .forStmt(
+ methodCall(inputTerm, "size"),
+ (indexTerm, loopBodyWriter) -> {
+ String keyTerm = newName("key");
+ String keyIsNullTerm = newName("keyIsNull");
+ String valueTerm = newName("value");
+ String valueIsNullTerm = newName("valueIsNull");
+
+ CastCodeBlock keyCast =
+ CastRuleProvider.generateCodeBlock(
+ context,
+ keyTerm,
+ keyIsNullTerm,
+ // Null check is done at the key
array access level
+ keyType.copy(false),
+ targetLogicalType);
+ CastCodeBlock valueCast =
+ CastRuleProvider.generateCodeBlock(
+ context,
+ valueTerm,
+ valueIsNullTerm,
+ // Null check is done at the value
array access level
+ valueType.copy(false),
+ targetLogicalType);
+
+ loopBodyWriter
+ // Write the comma
+ .ifStmt(
+ indexTerm + " != 0",
+ thenBodyWriter ->
+ thenBodyWriter.stmt(
+ methodCall(
+
builderTerm,
+ "append",
+
strLiteral(", "))))
+ // Declare key and values variables
+ .declPrimitiveStmt(keyType, keyTerm)
+ .declStmt(
+ boolean.class,
+ keyIsNullTerm,
+ methodCall(keyArrayTerm,
"isNullAt", indexTerm))
+ .declPrimitiveStmt(valueType, valueTerm)
+ .declStmt(
+ boolean.class,
+ valueIsNullTerm,
+ methodCall(valueArrayTerm,
"isNullAt", indexTerm))
+ // Execute casting if inner key/value not
null
+ .ifStmt(
+ "!" + keyIsNullTerm,
+ thenBodyWriter ->
+ thenBodyWriter
+ // If key not
null, extract it and
+ // execute the cast
+ .assignStmt(
+ keyTerm,
+
rowFieldReadAccess(
+
indexTerm,
+
keyArrayTerm,
+
keyType))
+ .append(keyCast)
+ .stmt(
+ methodCall(
+
builderTerm,
+
"append",
+
keyCast
+
.getReturnTerm())),
+ elseBodyWriter ->
+ elseBodyWriter.stmt(
+ methodCall(
+
builderTerm,
+ "append",
+
strLiteral("NULL"))))
Review comment:
Could you please extract all usages `NULL`, `<NULL>`, etc. to constants
in the superclass?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -76,7 +155,77 @@
Thread.currentThread().getContextClassLoader()),
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
- StringData.fromString("2021-09-24
14:34:56.123456")),
+ StringData.fromString("2021-09-24
14:34:56.123456"))
Review comment:
thank you!
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractCodeGeneratorCastRule.java
##########
@@ -148,9 +147,10 @@ protected AbstractCodeGeneratorCastRule(CastRulePredicate
predicate) {
classCode)
.getConstructors()[0]
.newInstance(constructorArgs);
- } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ } catch (Throwable e) {
Review comment:
If possible, would be better to keep listing all possible exceptions.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/AbstractNullAwareCodeGeneratorCastRule.java
##########
@@ -57,41 +57,37 @@ public CastCodeBlock generateCodeBlock(
String inputIsNullTerm,
LogicalType inputType,
LogicalType targetType) {
- StringBuilder resultCode = new StringBuilder();
+ CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
Review comment:
could also be final
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
Review comment:
since this is now unit testing, maybe we could test both boolean values.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -76,7 +155,77 @@
Thread.currentThread().getContextClassLoader()),
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
- StringData.fromString("2021-09-24
14:34:56.123456")),
+ StringData.fromString("2021-09-24
14:34:56.123456"))
Review comment:
Maybe you could extract those dates/time/timestamps to static vars,
similar to `CastFunctionITCase` so we can always have those default tests, and
then use some other date/time values if needed to test some other cases, or
edge cases.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
Review comment:
this is duplicate
##########
File path:
flink-table/flink-table-planner/src/test/resources/log4j2-test.properties
##########
@@ -26,3 +26,9 @@ appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+# Uncomment to enable codegen logging
Review comment:
nice!
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -143,9 +308,20 @@ public void run() throws Exception {
this.castContext,
this.inputType.getLogicalType(),
this.targetType.getLogicalType());
- assertNotNull(executor);
+ assertNotNull(
+ executor,
+ "Cannot resolve an executor for input "
+ + this.inputType
+ + " and target "
+ + this.targetType);
assertEquals(this.expectedData, executor.cast(this.inputData));
+
+ // Run twice to make sure rules are reusable without causing issues
+ assertEquals(
Review comment:
nice!
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class MapToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final MapToStringCastRule INSTANCE = new
MapToStringCastRule();
+
+ private MapToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.MAP)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getKeyType(), target)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getValueType(), target))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType keyType = ((MapType) inputLogicalType).getKeyType();
+ final LogicalType valueType = ((MapType)
inputLogicalType).getValueType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ final String keyArrayTerm = newName("keys");
+ final String valueArrayTerm = newName("values");
+
+ return new CastRuleUtils.CodeWriter()
Review comment:
Could you maybe add a sample generated code for this?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
Review comment:
Also here, some generated code example would be nice.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
+ .fromCase(
+ BINARY(2), new byte[] {0, 1},
StringData.fromString("\u0000\u0001"))
+ .fromCase(
+ VARBINARY(3),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ VARBINARY(5),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+ .fromCase(
+ DECIMAL(4, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 4, 3),
+ StringData.fromString("9.870"))
+ .fromCase(
+ DECIMAL(5, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 5, 3),
+ StringData.fromString("9.870"))
+ .fromCase(TINYINT(), (byte) -125,
StringData.fromString("-125"))
+ .fromCase(SMALLINT(), (short) 32767,
StringData.fromString("32767"))
+ .fromCase(INT(), -12345678,
StringData.fromString("-12345678"))
+ .fromCase(BIGINT(), 1234567891234L,
StringData.fromString("1234567891234"))
+ .fromCase(FLOAT(), -123.456f,
StringData.fromString("-123.456"))
Review comment:
For float and double maybe check some longer values that will end up
with the scientific representation.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ @Override
+ protected String generateCodeBlockInternal(
Review comment:
Also a sample generated code if you could please.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
##########
@@ -967,10 +969,20 @@ object ScalarOperatorGens {
inputType,
targetType
)
+
+ val castCode = if (castCodeBlock.getCode.isEmpty) {
+ ""
+ } else {
+ s"\n" +
Review comment:
Maybe worths keeping the comments anyway, to be even more clear that
there is no code generated?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ /* Example generated code for ARRAY<INT>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("[");
+ for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
+ if (i$2 != 0) {
+ builder$1.append(", ");
+ }
+ int element$3 = -1;
+ boolean elementIsNull$4 = _myInput.isNullAt(i$2);
+ if (!elementIsNull$4) {
+ element$3 = _myInput.getInt(i$2);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("]");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType innerInputType = ((ArrayType)
inputLogicalType).getElementType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ return new CastRuleUtils.CodeWriter()
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
Review comment:
`setLength(0)` could be used instead.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/ArrayToStringCastRule.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ARRAY} to {@link
LogicalTypeFamily#CHARACTER_STRING} cast rule. */
+public class ArrayToStringCastRule
+ extends AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final ArrayToStringCastRule INSTANCE = new
ArrayToStringCastRule();
+
+ private ArrayToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ARRAY)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((ArrayType)
input).getElementType(),
+ target))
+ .build());
+ }
+
+ /* Example generated code for ARRAY<INT>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("[");
+ for (int i$2 = 0; i$2 < _myInput.size(); i$2++) {
+ if (i$2 != 0) {
+ builder$1.append(", ");
+ }
+ int element$3 = -1;
+ boolean elementIsNull$4 = _myInput.isNullAt(i$2);
+ if (!elementIsNull$4) {
+ element$3 = _myInput.getInt(i$2);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("" + element$3);
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("]");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType innerInputType = ((ArrayType)
inputLogicalType).getElementType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ return new CastRuleUtils.CodeWriter()
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
Review comment:
`setLength(0)` could be used instead. I see that delete makes an array
copy, where the setLength(0) just "truncates" the buffer.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -60,6 +94,51 @@
.fromCase(SMALLINT(), (short) 10, 10L)
.fromCase(TINYINT(), (byte) 10, 10L),
CastTestSpecBuilder.testCastTo(STRING())
+ .fromCase(STRING(), null, null)
+ .fromCase(
+ CHAR(3), StringData.fromString("foo"),
StringData.fromString("foo"))
+ .fromCase(
+ VARCHAR(5),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ VARCHAR(10),
+ StringData.fromString("Flink"),
+ StringData.fromString("Flink"))
+ .fromCase(
+ STRING(),
+ StringData.fromString("Apache Flink"),
+ StringData.fromString("Apache Flink"))
+ .fromCase(STRING(), null, null)
+ .fromCase(BOOLEAN(), true,
StringData.fromString("true"))
+ .fromCase(
+ BINARY(2), new byte[] {0, 1},
StringData.fromString("\u0000\u0001"))
+ .fromCase(
+ VARBINARY(3),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ VARBINARY(5),
+ new byte[] {0, 1, 2},
+ StringData.fromString("\u0000\u0001\u0002"))
+ .fromCase(
+ BYTES(),
+ new byte[] {0, 1, 2, 3, 4},
+
StringData.fromString("\u0000\u0001\u0002\u0003\u0004"))
+ .fromCase(
+ DECIMAL(4, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 4, 3),
+ StringData.fromString("9.870"))
+ .fromCase(
+ DECIMAL(5, 3),
+ DecimalData.fromBigDecimal(new
BigDecimal("9.87"), 5, 3),
+ StringData.fromString("9.870"))
+ .fromCase(TINYINT(), (byte) -125,
StringData.fromString("-125"))
+ .fromCase(SMALLINT(), (short) 32767,
StringData.fromString("32767"))
+ .fromCase(INT(), -12345678,
StringData.fromString("-12345678"))
+ .fromCase(BIGINT(), 1234567891234L,
StringData.fromString("1234567891234"))
+ .fromCase(FLOAT(), -123.456f,
StringData.fromString("-123.456"))
Review comment:
not addressed ? or I'm missing something?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
Review comment:
Is there something missing here? I don't see a for loop.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/RowToStringCastRule.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#ROW} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class RowToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final RowToStringCastRule INSTANCE = new
RowToStringCastRule();
+
+ private RowToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.ROW)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && ((RowType) input)
+ .getFields().stream()
+ .allMatch(
+ field
->
+
CastRuleProvider
+
.exists(
+
field
+
.getType(),
+
target)))
+ .build());
+ }
+
+ /* Example generated code for ROW<`f0` INT, `f1` STRING>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
Review comment:
ah, The loop is happening in `generateCodeBlockInternal`, please ignore.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/rules/MapToStringCastRule.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting.rules;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.planner.functions.casting.CastCodeBlock;
+import org.apache.flink.table.planner.functions.casting.CastRulePredicate;
+import org.apache.flink.table.planner.functions.casting.CastRuleProvider;
+import org.apache.flink.table.planner.functions.casting.CodeGeneratorCastRule;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static
org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.constructorCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.functionCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.methodCall;
+import static
org.apache.flink.table.planner.functions.casting.rules.CastRuleUtils.strLiteral;
+
+/** {@link LogicalTypeRoot#MAP} to {@link LogicalTypeFamily#CHARACTER_STRING}
cast rule. */
+public class MapToStringCastRule extends
AbstractNullAwareCodeGeneratorCastRule<ArrayData, String> {
+
+ public static final MapToStringCastRule INSTANCE = new
MapToStringCastRule();
+
+ private MapToStringCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .predicate(
+ (input, target) ->
+ input.is(LogicalTypeRoot.MAP)
+ &&
target.is(LogicalTypeFamily.CHARACTER_STRING)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getKeyType(), target)
+ && CastRuleProvider.exists(
+ ((MapType)
input).getValueType(), target))
+ .build());
+ }
+
+ /* Example generated code for MAP<STRING, INTERVAL MONTH>:
+
+ isNull$0 = _myInputIsNull;
+ if (!isNull$0) {
+ org.apache.flink.table.data.ArrayData keys$2 = _myInput.keyArray();
+ org.apache.flink.table.data.ArrayData values$3 = _myInput.valueArray();
+ builder$1.delete(0, builder$1.length());
+ builder$1.append("{");
+ for (int i$4 = 0; i$4 < _myInput.size(); i$4++) {
+ if (i$4 != 0) {
+ builder$1.append(", ");
+ }
+ org.apache.flink.table.data.binary.BinaryStringData key$5 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ boolean keyIsNull$6 = keys$2.isNullAt(i$4);
+ int value$7 = -1;
+ boolean valueIsNull$8 = values$3.isNullAt(i$4);
+ if (!keyIsNull$6) {
+ key$5 = ((org.apache.flink.table.data.binary.BinaryStringData)
keys$2.getString(i$4));
+ builder$1.append(key$5);
+ } else {
+ builder$1.append("NULL");
+ }
+ builder$1.append("=");
+ if (!valueIsNull$8) {
+ value$7 = values$3.getInt(i$4);
+ result$2 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.utils.DateTimeUtils.intervalYearMonthToString(value$7));
+ builder$1.append(result$2);
+ } else {
+ builder$1.append("NULL");
+ }
+ }
+ builder$1.append("}");
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(builder$1.toString());
+ } else {
+ result$1 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+ }
+
+ */
+ @Override
+ protected String generateCodeBlockInternal(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ String returnVariable,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final LogicalType keyType = ((MapType) inputLogicalType).getKeyType();
+ final LogicalType valueType = ((MapType)
inputLogicalType).getValueType();
+
+ final String builderTerm = newName("builder");
+ context.declareClassField(
+ className(StringBuilder.class), builderTerm,
constructorCall(StringBuilder.class));
+
+ final String keyArrayTerm = newName("keys");
+ final String valueArrayTerm = newName("values");
+
+ return new CastRuleUtils.CodeWriter()
+ .declStmt(ArrayData.class, keyArrayTerm, methodCall(inputTerm,
"keyArray"))
+ .declStmt(ArrayData.class, valueArrayTerm,
methodCall(inputTerm, "valueArray"))
+ .stmt(methodCall(builderTerm, "delete", 0,
methodCall(builderTerm, "length")))
+ .stmt(methodCall(builderTerm, "append", strLiteral("{")))
+ .forStmt(
+ methodCall(inputTerm, "size"),
+ (indexTerm, loopBodyWriter) -> {
+ String keyTerm = newName("key");
+ String keyIsNullTerm = newName("keyIsNull");
+ String valueTerm = newName("value");
+ String valueIsNullTerm = newName("valueIsNull");
+
+ CastCodeBlock keyCast =
+ CastRuleProvider.generateCodeBlock(
+ context,
+ keyTerm,
+ keyIsNullTerm,
+ // Null check is done at the key
array access level
+ keyType.copy(false),
+ targetLogicalType);
+ CastCodeBlock valueCast =
+ CastRuleProvider.generateCodeBlock(
+ context,
+ valueTerm,
+ valueIsNullTerm,
+ // Null check is done at the value
array access level
+ valueType.copy(false),
+ targetLogicalType);
+
+ loopBodyWriter
+ // Write the comma
+ .ifStmt(
+ indexTerm + " != 0",
+ thenBodyWriter ->
+ thenBodyWriter.stmt(
+ methodCall(
+
builderTerm,
+ "append",
+
strLiteral(", "))))
+ // Declare key and values variables
+ .declPrimitiveStmt(keyType, keyTerm)
+ .declStmt(
+ boolean.class,
+ keyIsNullTerm,
+ methodCall(keyArrayTerm,
"isNullAt", indexTerm))
+ .declPrimitiveStmt(valueType, valueTerm)
+ .declStmt(
+ boolean.class,
+ valueIsNullTerm,
+ methodCall(valueArrayTerm,
"isNullAt", indexTerm))
+ // Execute casting if inner key/value not
null
+ .ifStmt(
+ "!" + keyIsNullTerm,
+ thenBodyWriter ->
+ thenBodyWriter
+ // If key not
null, extract it and
+ // execute the cast
+ .assignStmt(
+ keyTerm,
+
rowFieldReadAccess(
+
indexTerm,
+
keyArrayTerm,
+
keyType))
+ .append(keyCast)
+ .stmt(
+ methodCall(
+
builderTerm,
+
"append",
+
keyCast
+
.getReturnTerm())),
+ elseBodyWriter ->
+ elseBodyWriter.stmt(
+ methodCall(
+
builderTerm,
+ "append",
+
strLiteral("NULL"))))
Review comment:
Could you please extract all usages `NULL`, `<NULL>`, etc. to constants
in the superclass?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
##########
@@ -76,7 +155,77 @@
Thread.currentThread().getContextClassLoader()),
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2021-09-24T12:34:56.123456")),
- StringData.fromString("2021-09-24
14:34:56.123456")),
+ StringData.fromString("2021-09-24
14:34:56.123456"))
Review comment:
thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]