This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b1df4945141907022a3c5ddae21723a4d5a42f4 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Mon Dec 13 14:58:13 2021 +0200 [hotfix][table] Rename precision to length for CHAR/VARCHAR sink enforcer Rename all `precision` references in code and docs to `length` which were introduced with: https://github.com/apache/flink/commit/1151071b67b866bc18225fc7f522d29e819a6238 --- .../generated/execution_config_configuration.html | 4 +- .../table/api/config/ExecutionConfigOptions.java | 31 ++++++----- .../plan/nodes/exec/common/CommonExecSink.java | 10 ++-- .../nodes/exec/common/CommonExecSinkITCase.java | 18 +++---- .../runtime/operators/sink/ConstraintEnforcer.java | 60 +++++++++++----------- 5 files changed, 61 insertions(+), 62 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 2a35fc8..099a9f9 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -53,10 +53,10 @@ By default no operator is disabled.</td> <td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td> </tr> <tr> - <td><h5>table.exec.sink.char-precision-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">IGNORE</td> <td><p>Enum</p></td> - <td>Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) types will be trimmed or padded (only for CHAR(<precision>)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match [...] + <td>Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) types will be trimmed or padded (only for CHAR(<length>)), so that their length will match the one defined by the length of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defi [...] </tr> <tr> <td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 6f655b2..ba9a6c3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -120,16 +120,15 @@ public class ExecutionConfigOptions { "Determines how Flink enforces NOT NULL column constraints when inserting null values."); @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) - public static final ConfigOption<CharPrecisionEnforcer> - TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER = - key("table.exec.sink.char-precision-enforcer") - .enumType(CharPrecisionEnforcer.class) - .defaultValue(CharPrecisionEnforcer.IGNORE) - .withDescription( - "Determines whether string values for columns with CHAR(<precision>)/VARCHAR(<precision>) " - + "types will be trimmed or padded (only for CHAR(<precision>)), so that their " - + "length will match the one defined by the precision of their respective " - + "CHAR/VARCHAR column type."); + public static final ConfigOption<CharLengthEnforcer> TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER = + key("table.exec.sink.char-length-enforcer") + .enumType(CharLengthEnforcer.class) + .defaultValue(CharLengthEnforcer.IGNORE) + .withDescription( + "Determines whether string values for columns with CHAR(<length>)/VARCHAR(<length>) " + + "types will be trimmed or padded (only for CHAR(<length>)), so that their " + + "length will match the one defined by the length of their respective " + + "CHAR/VARCHAR column type."); @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption<UpsertMaterialize> TABLE_EXEC_SINK_UPSERT_MATERIALIZE = @@ -441,23 +440,23 @@ public class ExecutionConfigOptions { } /** - * The enforcer to guarantee that precision of CHAR/VARCHAR columns is respected when writing - * data into sink. + * The enforcer to guarantee that length of CHAR/VARCHAR columns is respected when writing data + * into sink. */ @PublicEvolving - public enum CharPrecisionEnforcer implements DescribedEnum { + public enum CharLengthEnforcer implements DescribedEnum { IGNORE( text( "Don't apply any trimming and padding, and instead " - + "ignore the CHAR/VARCHAR precision directive.")), + + "ignore the CHAR/VARCHAR length directive.")), TRIM_PAD( text( "Trim and pad string values to match the length " - + "defined by the CHAR/VARCHAR precision.")); + + "defined by the CHAR/VARCHAR length.")); private final InlineElement description; - CharPrecisionEnforcer(InlineElement description) { + CharLengthEnforcer(InlineElement description) { this.description = description; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 65500b9..091c09a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -203,20 +203,20 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames); } - // Build CHAR/VARCHAR precision enforcer + // Build CHAR/VARCHAR length enforcer final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo = getCharFieldInfo(physicalRowType); if (!charFieldInfo.isEmpty()) { - final ExecutionConfigOptions.CharPrecisionEnforcer charPrecisionEnforcer = + final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer = config.getConfiguration() - .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER); + .get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER); final List<String> charFieldNames = charFieldInfo.stream() .map(cfi -> fieldNames[cfi.fieldIdx()]) .collect(Collectors.toList()); - validatorBuilder.addCharPrecisionConstraint( - charPrecisionEnforcer, charFieldInfo, charFieldNames, fieldNames); + validatorBuilder.addCharLengthConstraint( + charLengthEnforcer, charFieldInfo, charFieldNames, fieldNames); } ConstraintEnforcer constraintEnforcer = validatorBuilder.build(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java index dde54b7..8714722 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -58,7 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.INT; -import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -194,7 +194,7 @@ public class CommonExecSinkITCase extends AbstractTestBase { } @Test - public void testCharPrecisionEnforcer() throws ExecutionException, InterruptedException { + public void testCharLengthEnforcer() throws ExecutionException, InterruptedException { final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final List<Row> rows = Arrays.asList( @@ -205,7 +205,7 @@ public class CommonExecSinkITCase extends AbstractTestBase { final TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder() - .schema(schemaForCharPrecisionEnforcer()) + .schema(schemaForCharLengthEnforcer()) .source(new TestSource(rows)) .build(); tableEnv.createTable("T1", sourceDescriptor); @@ -218,13 +218,13 @@ public class CommonExecSinkITCase extends AbstractTestBase { result.collect().forEachRemaining(results::add); assertThat(results, containsInAnyOrder(rows.toArray())); - // Change config option to "trim", to trim the strings based on their type precision + // Change config option to "trim", to trim the strings based on their type length try { tableEnv.getConfig() .getConfiguration() .setString( - TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), - ExecutionConfigOptions.CharPrecisionEnforcer.TRIM_PAD.name()); + TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name()); result = tableEnv.executeSql("SELECT * FROM T1"); result.await(); @@ -243,8 +243,8 @@ public class CommonExecSinkITCase extends AbstractTestBase { tableEnv.getConfig() .getConfiguration() .setString( - TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(), - ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name()); + TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(), + ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name()); } } @@ -378,7 +378,7 @@ public class CommonExecSinkITCase extends AbstractTestBase { return builder.build(); } - private static Schema schemaForCharPrecisionEnforcer() { + private static Schema schemaForCharLengthEnforcer() { return Schema.newBuilder() .column("a", "INT") .column("b", "CHAR(8)") diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java index 4755007..7822933 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -37,7 +38,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER; import static org.apache.flink.util.Preconditions.checkArgument; @@ -46,8 +47,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; * * <ul> * <li>{@code NOT NULL} column constraint of a sink table - * <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string values to comply with the - * {@code precision} defined in their corresponding types. + * <li>{@code CHAR(length)}/@{code VARCHAR(length)}: trim string values to comply with the {@code + * length} defined in their corresponding types. * </ul> */ @Internal @@ -60,9 +61,9 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> private final int[] notNullFieldIndices; private final String[] allFieldNames; - private final CharPrecisionEnforcer charPrecisionEnforcer; + private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer; private final int[] charFieldIndices; - private final int[] charFieldPrecisions; + private final int[] charFieldLengths; private final BitSet charFieldShouldPad; private final String operatorName; @@ -72,17 +73,17 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> private ConstraintEnforcer( NotNullEnforcer notNullEnforcer, int[] notNullFieldIndices, - CharPrecisionEnforcer charPrecisionEnforcer, + ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer, int[] charFieldIndices, - int[] charFieldPrecisions, + int[] charFieldLengths, BitSet charFieldShouldPad, String[] allFieldNames, String operatorName) { this.notNullEnforcer = notNullEnforcer; this.notNullFieldIndices = notNullFieldIndices; - this.charPrecisionEnforcer = charPrecisionEnforcer; + this.charLengthEnforcer = charLengthEnforcer; this.charFieldIndices = charFieldIndices; - this.charFieldPrecisions = charFieldPrecisions; + this.charFieldLengths = charFieldLengths; this.charFieldShouldPad = charFieldShouldPad; this.allFieldNames = allFieldNames; this.operatorName = operatorName; @@ -105,14 +106,14 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> /** * Helper builder, so that the {@link ConstraintEnforcer} can be instantiated with only the NOT - * NULL constraint validation, only the CHAR/VARCHAR precision validation, or both. + * NULL constraint validation, only the CHAR/VARCHAR length validation, or both. */ public static class Builder { private NotNullEnforcer notNullEnforcer; private int[] notNullFieldIndices; - private CharPrecisionEnforcer charPrecisionEnforcer; + private CharLengthEnforcer charLengthEnforcer; private List<CharFieldInfo> charFieldInfo; private String[] allFieldNames; @@ -140,13 +141,13 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> } } - public void addCharPrecisionConstraint( - CharPrecisionEnforcer charPrecisionEnforcer, + public void addCharLengthConstraint( + ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer, List<CharFieldInfo> charFieldInfo, List<String> charFieldNames, String[] allFieldNames) { - this.charPrecisionEnforcer = charPrecisionEnforcer; - if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM_PAD) { + this.charLengthEnforcer = charLengthEnforcer; + if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) { checkArgument( charFieldInfo.size() > 0, "ConstraintValidator requires that there are CHAR/VARCHAR fields."); @@ -155,15 +156,14 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> operatorNames.add( String.format( - "CharPrecisionEnforcer(fields=[%s])", + "CharLengthEnforcer(fields=[%s])", String.join(", ", charFieldNames))); this.isConfigured = true; } } /** - * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are configured, null is - * returned. + * If neither of NOT NULL or CHAR/VARCHAR length enforcers are configured, null is returned. */ public ConstraintEnforcer build() { if (isConfigured) { @@ -172,12 +172,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> return new ConstraintEnforcer( notNullEnforcer, notNullFieldIndices, - charPrecisionEnforcer, + charLengthEnforcer, charFieldInfo != null ? charFieldInfo.stream().mapToInt(cfi -> cfi.fieldIdx).toArray() : null, charFieldInfo != null - ? charFieldInfo.stream().mapToInt(cfi -> cfi.precision).toArray() + ? charFieldInfo.stream().mapToInt(cfi -> cfi.length).toArray() : null, charFieldInfo != null ? buildShouldPad(charFieldInfo) : null, allFieldNames, @@ -231,8 +231,8 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> } private RowData processCharConstraint(RowData rowData) { - if (charPrecisionEnforcer == null - || charPrecisionEnforcer == CharPrecisionEnforcer.IGNORE) { + if (charLengthEnforcer == null + || charLengthEnforcer == ExecutionConfigOptions.CharLengthEnforcer.IGNORE) { return rowData; } @@ -240,26 +240,26 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> for (int i = 0; i < charFieldIndices.length; i++) { final int fieldIdx = charFieldIndices[i]; - final int precision = charFieldPrecisions[i]; + final int length = charFieldLengths[i]; final BinaryStringData stringData = (BinaryStringData) rowData.getString(fieldIdx); final int sourceStrLength = stringData.numChars(); - if (charFieldShouldPad.get(i) && sourceStrLength < precision) { + if (charFieldShouldPad.get(i) && sourceStrLength < length) { if (updatedRowData == null) { updatedRowData = new UpdatableRowData(rowData, allFieldNames.length); } final int srcSizeInBytes = stringData.getSizeInBytes(); - final byte[] newString = new byte[srcSizeInBytes + precision - sourceStrLength]; + final byte[] newString = new byte[srcSizeInBytes + length - sourceStrLength]; for (int j = srcSizeInBytes; j < newString.length; j++) { newString[j] = (byte) 32; // space } SegmentsUtil.copyToBytes(stringData.getSegments(), 0, newString, 0, srcSizeInBytes); updatedRowData.setField(fieldIdx, StringData.fromBytes(newString)); - } else if (sourceStrLength > precision) { + } else if (sourceStrLength > length) { if (updatedRowData == null) { updatedRowData = new UpdatableRowData(rowData, allFieldNames.length); } - updatedRowData.setField(fieldIdx, stringData.substring(0, precision)); + updatedRowData.setField(fieldIdx, stringData.substring(0, length)); } } @@ -273,12 +273,12 @@ public class ConstraintEnforcer extends TableStreamOperator<RowData> @Internal public static class CharFieldInfo { private final int fieldIdx; - private final Integer precision; + private final Integer length; private final boolean shouldPad; - public CharFieldInfo(int fieldIdx, @Nullable Integer precision, boolean shouldPad) { + public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean shouldPad) { this.fieldIdx = fieldIdx; - this.precision = precision; + this.length = length; this.shouldPad = shouldPad; }