This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 19bc181 [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks 19bc181 is described below commit 19bc18100802e8e5a56c5ce08e985d589db81838 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Fri Dec 17 15:57:05 2021 +0200 [FLINK-25366][table-planner][table-runtime] Implement BINARY/VARBINARY length validation for sinks Similar to the length validation for CHAR/VARCHAR, implement the same logic for BINARY/VARBINARY and apply any necessary trimming or padding to match the length specified in the corresponding type. This closes #18142. --- .../generated/execution_config_configuration.html | 12 +- .../table/api/config/ExecutionConfigOptions.java | 31 ++--- .../plan/nodes/exec/common/CommonExecSink.java | 75 ++++++++--- .../nodes/exec/common/CommonExecSinkITCase.java | 128 +++++++++++++++++- .../runtime/operators/sink/ConstraintEnforcer.java | 146 +++++++++++++++------ 5 files changed, 307 insertions(+), 85 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 099a9f9..3d3163d 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -53,12 +53,6 @@ By default no operator is disabled.</td> <td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td> </tr> <tr> - <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> - <td style="word-wrap: break-word;">IGNORE</td> - <td><p>Enum</p></td> - <td>Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) types will be trimmed or padded (only for CHAR(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defi [...] - </tr> - <tr> <td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">AUTO</td> <td><p>Enum</p></td> @@ -77,6 +71,12 @@ By default no operator is disabled.</td> <td>Determines how Flink enforces NOT NULL column constraints when inserting null values.<br /><br />Possible values:<ul><li>"ERROR": Throw a runtime exception when writing null values into NOT NULL column.</li><li>"DROP": Drop records silently if a null value would have to be inserted into a NOT NULL column.</li></ul></td> </tr> <tr> + <td><h5>table.exec.sink.type-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">IGNORE</td> + <td><p>Enum</p></td> + <td>Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded (only for CHAR(<length>)/BINARY(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR/BINARY/VARBINARY column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR/BINARY/ [...] + </tr> + <tr> <td><h5>table.exec.sink.upsert-materialize</h5><br> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">AUTO</td> <td><p>Enum</p></td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index ba9a6c3..43a9b46 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -120,15 +120,16 @@ public class ExecutionConfigOptions { "Determines how Flink enforces NOT NULL column constraints when inserting null values."); @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) - public static final ConfigOption<CharLengthEnforcer> TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER = - key("table.exec.sink.char-length-enforcer") - .enumType(CharLengthEnforcer.class) - .defaultValue(CharLengthEnforcer.IGNORE) + public static final ConfigOption<TypeLengthEnforcer> TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER = + key("table.exec.sink.type-length-enforcer") + .enumType(TypeLengthEnforcer.class) + .defaultValue(TypeLengthEnforcer.IGNORE) .withDescription( - "Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) " - + "types will be trimmed or padded (only for CHAR(<length>)), so that their " - + "length will match the one defined by the length of their respective " - + "CHAR/VARCHAR column type."); + "Determines whether values for columns with CHAR(<length>)/VARCHAR(<length>)" + + "/BINARY(<length>)/VARBINARY(<length>) types will be trimmed or padded " + + "(only for CHAR(<length>)/BINARY(<length>)), so that their length " + + "will match the one defined by the length of their respective " + + "CHAR/VARCHAR/BINARY/VARBINARY column type."); @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE = @@ -440,23 +441,23 @@ public class ExecutionConfigOptions { } /** - * The enforcer to guarantee that length of CHAR/VARCHAR columns is respected when writing data - * into sink. + * The enforcer to guarantee that length of CHAR/VARCHAR/BINARY/VARBINARY columns is respected + * when writing data into a sink. */ @PublicEvolving - public enum CharLengthEnforcer implements DescribedEnum { + public enum TypeLengthEnforcer implements DescribedEnum { IGNORE( text( "Don't apply any trimming and padding, and instead " - + "ignore the CHAR/VARCHAR length directive.")), + + "ignore the CHAR/VARCHAR/BINARY/VARBINARY length directive.")), TRIM_PAD( text( - "Trim and pad string values to match the length " - + "defined by the CHAR/VARCHAR length.")); + "Trim and pad string and binary values to match the length " + + "defined by the CHAR/VARCHAR/BINARY/VARBINARY length.")); private final InlineElement description; - CharLengthEnforcer(InlineElement description) { + TypeLengthEnforcer(InlineElement description) { this.description = description; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index cb73eaf..25a0b2b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -66,11 +66,11 @@ import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInsert import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.types.RowKind; @@ -211,20 +211,34 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames); } + final ExecutionConfigOptions.TypeLengthEnforcer typeLengthEnforcer = + config.getConfiguration() + .get(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER); + // Build CHAR/VARCHAR length enforcer - final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo = - getCharFieldInfo(physicalRowType); + final List<ConstraintEnforcer.FieldInfo> charFieldInfo = + getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.CHAR); if (!charFieldInfo.isEmpty()) { - final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer = - config.getConfiguration() - .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER); final List<String> charFieldNames = charFieldInfo.stream() .map(cfi -> fieldNames[cfi.fieldIdx()]) .collect(Collectors.toList()); validatorBuilder.addCharLengthConstraint( - charLengthEnforcer, charFieldInfo, charFieldNames, fieldNames); + typeLengthEnforcer, charFieldInfo, charFieldNames, fieldNames); + } + + // Build BINARY/VARBINARY length enforcer + final List<ConstraintEnforcer.FieldInfo> binaryFieldInfo = + getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.BINARY); + if (!binaryFieldInfo.isEmpty()) { + final List<String> binaryFieldNames = + binaryFieldInfo.stream() + .map(cfi -> fieldNames[cfi.fieldIdx()]) + .collect(Collectors.toList()); + + validatorBuilder.addBinaryLengthConstraint( + typeLengthEnforcer, binaryFieldInfo, binaryFieldNames, fieldNames); } ConstraintEnforcer constraintEnforcer = validatorBuilder.build(); @@ -257,26 +271,40 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> } /** - * Returns a List of {@link ConstraintEnforcer.CharFieldInfo}, each containing the info needed - * to determine whether a string value needs trimming and/or padding. + * Returns a List of {@link ConstraintEnforcer.FieldInfo}, each containing the info needed to + * determine whether a string or binary value needs trimming and/or padding. */ - private List<ConstraintEnforcer.CharFieldInfo> getCharFieldInfo(RowType physicalType) { - final List<ConstraintEnforcer.CharFieldInfo> charFieldsAndLengths = new ArrayList<>(); + private List<ConstraintEnforcer.FieldInfo> getFieldInfoForLengthEnforcer( + RowType physicalType, LengthEnforcerType enforcerType) { + LogicalTypeRoot staticType = null; + LogicalTypeRoot variableType = null; + int maxLength = 0; + switch (enforcerType) { + case CHAR: + staticType = LogicalTypeRoot.CHAR; + variableType = LogicalTypeRoot.VARCHAR; + maxLength = CharType.MAX_LENGTH; + break; + case BINARY: + staticType = LogicalTypeRoot.BINARY; + variableType = LogicalTypeRoot.VARBINARY; + maxLength = BinaryType.MAX_LENGTH; + } + final List<ConstraintEnforcer.FieldInfo> fieldsAndLengths = new ArrayList<>(); for (int i = 0; i < physicalType.getFieldCount(); i++) { LogicalType type = physicalType.getTypeAt(i); - boolean isChar = type.is(LogicalTypeRoot.CHAR); + boolean isStatic = type.is(staticType); // Should trim and possibly pad - if ((isChar && (LogicalTypeChecks.getLength(type) < CharType.MAX_LENGTH)) - || (type.is(LogicalTypeRoot.VARCHAR) - && (LogicalTypeChecks.getLength(type) < VarCharType.MAX_LENGTH))) { - charFieldsAndLengths.add( - new ConstraintEnforcer.CharFieldInfo( - i, LogicalTypeChecks.getLength(type), isChar)); - } else if (isChar) { // Should pad - charFieldsAndLengths.add(new ConstraintEnforcer.CharFieldInfo(i, null, isChar)); + if ((isStatic && (LogicalTypeChecks.getLength(type) < maxLength)) + || (type.is(variableType) && (LogicalTypeChecks.getLength(type) < maxLength))) { + fieldsAndLengths.add( + new ConstraintEnforcer.FieldInfo( + i, LogicalTypeChecks.getLength(type), isStatic)); + } else if (isStatic) { // Should pad + fieldsAndLengths.add(new ConstraintEnforcer.FieldInfo(i, null, isStatic)); } } - return charFieldsAndLengths; + return fieldsAndLengths; } /** @@ -514,4 +542,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> private RowType getPhysicalRowType(ResolvedSchema schema) { return (RowType) schema.toPhysicalRowDataType().getLogicalType(); } + + private enum LengthEnforcerType { + CHAR, + BINARY + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java index 8714722..5ee26ae 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -58,8 +58,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.INT; -import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -218,13 +218,14 @@ public class CommonExecSinkITCase extends AbstractTestBase { result.collect().forEachRemaining(results::add); assertThat(results, containsInAnyOrder(rows.toArray())); - // Change config option to "trim", to trim the strings based on their type length + // Change config option to "trim_pad", to trim or pad the strings + // accordingly, based on their type length try { tableEnv.getConfig() .getConfiguration() .setString( - TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(), - ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name()); + TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name()); result = tableEnv.executeSql("SELECT * FROM T1"); result.await(); @@ -243,8 +244,112 @@ public class CommonExecSinkITCase extends AbstractTestBase { tableEnv.getConfig() .getConfiguration() .setString( - TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(), - ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name()); + TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name()); + } + } + + @Test + public void testBinaryLengthEnforcer() throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final List<Row> rows = + Arrays.asList( + Row.of( + 1, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, + 11, + 111, + new byte[] {1, 2, 3}), + Row.of( + 2, + new byte[] {1, 2, 3, 4, 5}, + new byte[] {1, 2, 3}, + 22, + 222, + new byte[] {1, 2, 3, 4, 5, 6}), + Row.of( + 3, + new byte[] {1, 2, 3, 4, 5, 6}, + new byte[] {1, 2, 3, 4, 5}, + 33, + 333, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8}), + Row.of( + 4, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, + new byte[] {1, 2, 3, 4, 5, 6}, + 44, + 444, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaForBinaryLengthEnforcer()) + .source(new TestSource(rows)) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + + // Default config - ignore (no trim) + TableResult result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<Row> results = new ArrayList<>(); + result.collect().forEachRemaining(results::add); + assertThat(results, containsInAnyOrder(rows.toArray())); + + // Change config option to "trim_pad", to trim or pad the strings + // accordingly, based on their type length + try { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name()); + + result = tableEnv.executeSql("SELECT * FROM T1"); + result.await(); + + final List<Row> expected = + Arrays.asList( + Row.of( + 1, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, + new byte[] {1, 2, 3, 4, 5, 6}, + 11, + 111, + new byte[] {1, 2, 3}), + Row.of( + 2, + new byte[] {1, 2, 3, 4, 5, 0, 0, 0}, + new byte[] {1, 2, 3, 0, 0, 0}, + 22, + 222, + new byte[] {1, 2, 3, 4, 5, 6}), + Row.of( + 3, + new byte[] {1, 2, 3, 4, 5, 6, 0, 0}, + new byte[] {1, 2, 3, 4, 5, 0}, + 33, + 333, + new byte[] {1, 2, 3, 4, 5, 6}), + Row.of( + 4, + new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, + new byte[] {1, 2, 3, 4, 5, 6}, + 44, + 444, + new byte[] {1, 2, 3, 4, 5, 6})); + final List<Row> resultsTrimmed = new ArrayList<>(); + result.collect().forEachRemaining(resultsTrimmed::add); + assertThat(resultsTrimmed, containsInAnyOrder(expected.toArray())); + + } finally { + tableEnv.getConfig() + .getConfiguration() + .setString( + TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name()); } } @@ -389,6 +494,17 @@ public class CommonExecSinkITCase extends AbstractTestBase { .build(); } + private static Schema schemaForBinaryLengthEnforcer() { + return Schema.newBuilder() + .column("a", "INT") + .column("b", "BINARY(8)") + .column("c", "BINARY(6)") + .column("d", "INT") + .column("e", "INT") + .column("f", "VARBINARY(6)") + .build(); + } + private static Schema schemaForNotNullEnforcer() { return Schema.newBuilder() .column("a", "INT") diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java index 7822933..35c9888 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -35,11 +34,12 @@ import org.apache.flink.table.runtime.util.StreamRecordCollector; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.List; -import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TypeLengthEnforcer; import static org.apache.flink.util.Preconditions.checkArgument; /** @@ -61,10 +61,13 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> private final int[] notNullFieldIndices; private final String[] allFieldNames; - private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer; + private final TypeLengthEnforcer typeLengthEnforcer; private final int[] charFieldIndices; private final int[] charFieldLengths; - private final BitSet charFieldShouldPad; + private final BitSet charFieldCouldPad; + private final int[] binaryFieldIndices; + private final int[] binaryFieldLengths; + private final BitSet binaryFieldCouldPad; private final String operatorName; @@ -73,18 +76,24 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> private ConstraintEnforcer( NotNullEnforcer notNullEnforcer, int[] notNullFieldIndices, - ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer, + TypeLengthEnforcer typeLengthEnforcer, int[] charFieldIndices, int[] charFieldLengths, - BitSet charFieldShouldPad, + BitSet charFieldCouldPad, + int[] binaryFieldIndices, + int[] binaryFieldLengths, + BitSet binaryFieldCouldPad, String[] allFieldNames, String operatorName) { this.notNullEnforcer = notNullEnforcer; this.notNullFieldIndices = notNullFieldIndices; - this.charLengthEnforcer = charLengthEnforcer; + this.typeLengthEnforcer = typeLengthEnforcer; this.charFieldIndices = charFieldIndices; this.charFieldLengths = charFieldLengths; - this.charFieldShouldPad = charFieldShouldPad; + this.charFieldCouldPad = charFieldCouldPad; + this.binaryFieldIndices = binaryFieldIndices; + this.binaryFieldLengths = binaryFieldLengths; + this.binaryFieldCouldPad = binaryFieldCouldPad; this.allFieldNames = allFieldNames; this.operatorName = operatorName; } @@ -106,15 +115,17 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> /** * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT - * NULL constraint validation, only the CHAR/VARCHAR length validation, or both. + * NULL constraint validation, only the CHAR/VARCHAR length validation, only the + * BINARY/VARBINARY length validation or combinations of them, or all of them. */ public static class Builder { private NotNullEnforcer notNullEnforcer; private int[] notNullFieldIndices; - private CharLengthEnforcer charLengthEnforcer; - private List<CharFieldInfo> charFieldInfo; + private TypeLengthEnforcer typeLengthEnforcer; + private List<FieldInfo> charFieldInfo; + private List<FieldInfo> binaryFieldInfo; private String[] allFieldNames; private final List<String> operatorNames = new ArrayList<>(); @@ -142,12 +153,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> } public void addCharLengthConstraint( - ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer, - List<CharFieldInfo> charFieldInfo, + TypeLengthEnforcer typeLengthEnforcer, + List<FieldInfo> charFieldInfo, List<String> charFieldNames, String[] allFieldNames) { - this.charLengthEnforcer = charLengthEnforcer; - if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) { + this.typeLengthEnforcer = typeLengthEnforcer; + if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) { checkArgument( charFieldInfo.size() > 0, "ConstraintValidator requires that there are CHAR/VARCHAR fields."); @@ -156,14 +167,35 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> operatorNames.add( String.format( - "CharLengthEnforcer(fields=[%s])", - String.join(", ", charFieldNames))); + "LengthEnforcer(fields=[%s])", String.join(", ", charFieldNames))); + this.isConfigured = true; + } + } + + public void addBinaryLengthConstraint( + TypeLengthEnforcer typeLengthEnforcer, + List<FieldInfo> binaryFieldInfo, + List<String> binaryFieldNames, + String[] allFieldNames) { + this.typeLengthEnforcer = typeLengthEnforcer; + if (this.typeLengthEnforcer == TypeLengthEnforcer.TRIM_PAD) { + checkArgument( + binaryFieldInfo.size() > 0, + "ConstraintValidator requires that there are BINARY/VARBINARY fields."); + this.binaryFieldInfo = binaryFieldInfo; + this.allFieldNames = allFieldNames; + + operatorNames.add( + String.format( + "LengthEnforcer(fields=[%s])", + String.join(", ", binaryFieldNames))); this.isConfigured = true; } } /** - * If neither of NOT NULL or CHAR/VARCHAR length enforcers are configured, null is returned. + * If neither of NOT NULL or CHAR/VARCHAR length or BINARY/VARBINARY enforcers are + * configured, null is returned. */ public ConstraintEnforcer build() { if (isConfigured) { @@ -172,14 +204,21 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> return new ConstraintEnforcer( notNullEnforcer, notNullFieldIndices, - charLengthEnforcer, + typeLengthEnforcer, charFieldInfo != null - ? charFieldInfo.stream().mapToInt(cfi -> cfi.fieldIdx).toArray() + ? charFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray() : null, charFieldInfo != null - ? charFieldInfo.stream().mapToInt(cfi -> cfi.length).toArray() + ? charFieldInfo.stream().mapToInt(fi -> fi.length).toArray() + : null, + charFieldInfo != null ? buildCouldPad(charFieldInfo) : null, + binaryFieldInfo != null + ? binaryFieldInfo.stream().mapToInt(fi -> fi.fieldIdx).toArray() : null, - charFieldInfo != null ? buildShouldPad(charFieldInfo) : null, + binaryFieldInfo != null + ? binaryFieldInfo.stream().mapToInt(fi -> fi.length).toArray() + : null, + binaryFieldInfo != null ? buildCouldPad(binaryFieldInfo) : null, allFieldNames, operatorName); } @@ -187,21 +226,25 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> } } - private static BitSet buildShouldPad(List<CharFieldInfo> charFieldInfo) { - BitSet shouldPad = new BitSet(charFieldInfo.size()); + private static BitSet buildCouldPad(List<FieldInfo> charFieldInfo) { + BitSet couldPad = new BitSet(charFieldInfo.size()); for (int i = 0; i < charFieldInfo.size(); i++) { - if (charFieldInfo.get(i).shouldPad) { - shouldPad.set(i); + if (charFieldInfo.get(i).couldPad) { + couldPad.set(i); } } - return shouldPad; + return couldPad; } @Override public void processElement(StreamRecord<RowData> element) throws Exception { RowData processedRowData = processNotNullConstraint(element.getValue()); + // If NOT NULL constraint is not respected don't proceed to process the other constraints, + // simply drop the record. if (processedRowData != null) { - collector.collect(processCharConstraint(processedRowData)); + processedRowData = processCharConstraint(processedRowData); + processedRowData = processBinaryConstraint(processedRowData); + collector.collect(processedRowData); } } @@ -231,8 +274,9 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> } private RowData processCharConstraint(RowData rowData) { - if (charLengthEnforcer == null - || charLengthEnforcer == ExecutionConfigOptions.CharLengthEnforcer.IGNORE) { + if (typeLengthEnforcer == null + || typeLengthEnforcer == TypeLengthEnforcer.IGNORE + || charFieldIndices == null) { return rowData; } @@ -244,7 +288,7 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> final BinaryStringData stringData = (BinaryStringData) rowData.getString(fieldIdx); final int sourceStrLength = stringData.numChars(); - if (charFieldShouldPad.get(i) && sourceStrLength < length) { + if (charFieldCouldPad.get(i) && sourceStrLength < length) { if (updatedRowData == null) { updatedRowData = new UpdatableRowData(rowData, allFieldNames.length); } @@ -266,20 +310,48 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> return updatedRowData != null ? updatedRowData : rowData; } + private RowData processBinaryConstraint(RowData rowData) { + if (typeLengthEnforcer == null + || typeLengthEnforcer == TypeLengthEnforcer.IGNORE + || binaryFieldIndices == null) { + return rowData; + } + + UpdatableRowData updatedRowData = null; + + for (int i = 0; i < binaryFieldLengths.length; i++) { + final int fieldIdx = binaryFieldIndices[i]; + final int length = binaryFieldLengths[i]; + final byte[] binaryData = rowData.getBinary(fieldIdx); + final int sourceLength = binaryData.length; + + // Trimming takes places because of the shorter length used in `Arrays.copyOf` and + // padding because of the longer length, as implicitly the trailing bytes are 0. + if ((sourceLength > length) || (binaryFieldCouldPad.get(i) && sourceLength < length)) { + if (updatedRowData == null) { + updatedRowData = new UpdatableRowData(rowData, allFieldNames.length); + } + updatedRowData.setField(fieldIdx, Arrays.copyOf(binaryData, length)); + } + } + + return updatedRowData != null ? updatedRowData : rowData; + } + /** - * Helper POJO to keep info about CHAR/VARCHAR Fields, used to determine if trimming or padding - * is needed. + * Helper POJO to keep info about CHAR/VARCHAR/BINARY/VARBINARY fields, used to determine if + * trimming or padding is needed. */ @Internal - public static class CharFieldInfo { + public static class FieldInfo { private final int fieldIdx; private final Integer length; - private final boolean shouldPad; + private final boolean couldPad; - public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean shouldPad) { + public FieldInfo(int fieldIdx, @Nullable Integer length, boolean couldPad) { this.fieldIdx = fieldIdx; this.length = length; - this.shouldPad = shouldPad; + this.couldPad = couldPad; } public int fieldIdx() {