This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b1df4945141907022a3c5ddae21723a4d5a42f4
Author: Marios Trivyzas <mat...@gmail.com>
AuthorDate: Mon Dec 13 14:58:13 2021 +0200

    [hotfix][table] Rename precision to length for CHAR/VARCHAR sink enforcer
    
    Rename all `precision` references in code and docs to `length`
    which were introduced with: 
https://github.com/apache/flink/commit/1151071b67b866bc18225fc7f522d29e819a6238
---
 .../generated/execution_config_configuration.html  |  4 +-
 .../table/api/config/ExecutionConfigOptions.java   | 31 ++++++-----
 .../plan/nodes/exec/common/CommonExecSink.java     | 10 ++--
 .../nodes/exec/common/CommonExecSinkITCase.java    | 18 +++----
 .../runtime/operators/sink/ConstraintEnforcer.java | 60 +++++++++++-----------
 5 files changed, 61 insertions(+), 62 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 2a35fc8..099a9f9 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -53,10 +53,10 @@ By default no operator is disabled.</td>
             <td>Sets default parallelism for all operators (such as aggregate, 
join, filter) to run with parallel instances. This config has a higher priority 
than parallelism of StreamExecutionEnvironment (actually, this config overrides 
the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no 
default parallelism is set, then it will fallback to use the parallelism of 
StreamExecutionEnvironment.</td>
         </tr>
         <tr>
-            <td><h5>table.exec.sink.char-precision-enforcer</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td><h5>table.exec.sink.char-length-enforcer</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">IGNORE</td>
             <td><p>Enum</p></td>
-            <td>Determines whether string values for columns with 
CHAR(&lt;precision&gt;)/VARCHAR(&lt;precision&gt;) types will be trimmed or 
padded (only for CHAR(&lt;precision&gt;)), so that their length will match the 
one defined by the precision of their respective CHAR/VARCHAR column type.<br 
/><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, 
and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": 
Trim and pad string values to match  [...]
+            <td>Determines whether string values for columns with 
CHAR(&lt;length&gt;)/VARCHAR(&lt;length&gt;) types will be trimmed or padded 
(only for CHAR(&lt;length&gt;)), so that their length will match the one 
defined by the length of their respective CHAR/VARCHAR column type.<br /><br 
/>Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and 
instead ignore the CHAR/VARCHAR length directive.</li><li>"TRIM_PAD": Trim and 
pad string values to match the length defi [...]
         </tr>
         <tr>
             <td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label 
label-primary">Streaming</span></td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 6f655b2..ba9a6c3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -120,16 +120,15 @@ public class ExecutionConfigOptions {
                             "Determines how Flink enforces NOT NULL column 
constraints when inserting null values.");
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<CharPrecisionEnforcer>
-            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER =
-                    key("table.exec.sink.char-precision-enforcer")
-                            .enumType(CharPrecisionEnforcer.class)
-                            .defaultValue(CharPrecisionEnforcer.IGNORE)
-                            .withDescription(
-                                    "Determines whether string values for 
columns with CHAR(<precision>)/VARCHAR(<precision>) "
-                                            + "types will be trimmed or padded 
(only for CHAR(<precision>)), so that their "
-                                            + "length will match the one 
defined by the precision of their respective "
-                                            + "CHAR/VARCHAR column type.");
+    public static final ConfigOption<CharLengthEnforcer> 
TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER =
+            key("table.exec.sink.char-length-enforcer")
+                    .enumType(CharLengthEnforcer.class)
+                    .defaultValue(CharLengthEnforcer.IGNORE)
+                    .withDescription(
+                            "Determines whether string values for columns with 
CHAR(<length>)/VARCHAR(<length>) "
+                                    + "types will be trimmed or padded (only 
for CHAR(<length>)), so that their "
+                                    + "length will match the one defined by 
the length of their respective "
+                                    + "CHAR/VARCHAR column type.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     public static final ConfigOption<UpsertMaterialize> 
TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
@@ -441,23 +440,23 @@ public class ExecutionConfigOptions {
     }
 
     /**
-     * The enforcer to guarantee that precision of CHAR/VARCHAR columns is 
respected when writing
-     * data into sink.
+     * The enforcer to guarantee that length of CHAR/VARCHAR columns is 
respected when writing data
+     * into sink.
      */
     @PublicEvolving
-    public enum CharPrecisionEnforcer implements DescribedEnum {
+    public enum CharLengthEnforcer implements DescribedEnum {
         IGNORE(
                 text(
                         "Don't apply any trimming and padding, and instead "
-                                + "ignore the CHAR/VARCHAR precision 
directive.")),
+                                + "ignore the CHAR/VARCHAR length 
directive.")),
         TRIM_PAD(
                 text(
                         "Trim and pad string values to match the length "
-                                + "defined by the CHAR/VARCHAR precision."));
+                                + "defined by the CHAR/VARCHAR length."));
 
         private final InlineElement description;
 
-        CharPrecisionEnforcer(InlineElement description) {
+        CharLengthEnforcer(InlineElement description) {
             this.description = description;
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 65500b9..091c09a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -203,20 +203,20 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                     notNullEnforcer, notNullFieldIndices, notNullFieldNames, 
fieldNames);
         }
 
-        // Build CHAR/VARCHAR precision enforcer
+        // Build CHAR/VARCHAR length enforcer
         final List<ConstraintEnforcer.CharFieldInfo> charFieldInfo =
                 getCharFieldInfo(physicalRowType);
         if (!charFieldInfo.isEmpty()) {
-            final ExecutionConfigOptions.CharPrecisionEnforcer 
charPrecisionEnforcer =
+            final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer 
=
                     config.getConfiguration()
-                            
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER);
+                            
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER);
             final List<String> charFieldNames =
                     charFieldInfo.stream()
                             .map(cfi -> fieldNames[cfi.fieldIdx()])
                             .collect(Collectors.toList());
 
-            validatorBuilder.addCharPrecisionConstraint(
-                    charPrecisionEnforcer, charFieldInfo, charFieldNames, 
fieldNames);
+            validatorBuilder.addCharLengthConstraint(
+                    charLengthEnforcer, charFieldInfo, charFieldNames, 
fieldNames);
         }
 
         ConstraintEnforcer constraintEnforcer = validatorBuilder.build();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index dde54b7..8714722 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -58,7 +58,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.DataTypes.INT;
-import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -194,7 +194,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testCharPrecisionEnforcer() throws ExecutionException, 
InterruptedException {
+    public void testCharLengthEnforcer() throws ExecutionException, 
InterruptedException {
         final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
         final List<Row> rows =
                 Arrays.asList(
@@ -205,7 +205,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
 
         final TableDescriptor sourceDescriptor =
                 TableFactoryHarness.newBuilder()
-                        .schema(schemaForCharPrecisionEnforcer())
+                        .schema(schemaForCharLengthEnforcer())
                         .source(new TestSource(rows))
                         .build();
         tableEnv.createTable("T1", sourceDescriptor);
@@ -218,13 +218,13 @@ public class CommonExecSinkITCase extends 
AbstractTestBase {
         result.collect().forEachRemaining(results::add);
         assertThat(results, containsInAnyOrder(rows.toArray()));
 
-        // Change config option to "trim", to trim the strings based on their 
type precision
+        // Change config option to "trim", to trim the strings based on their 
type length
         try {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
-                            
ExecutionConfigOptions.CharPrecisionEnforcer.TRIM_PAD.name());
+                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharLengthEnforcer.TRIM_PAD.name());
 
             result = tableEnv.executeSql("SELECT * FROM T1");
             result.await();
@@ -243,8 +243,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
             tableEnv.getConfig()
                     .getConfiguration()
                     .setString(
-                            TABLE_EXEC_SINK_CHAR_PRECISION_ENFORCER.key(),
-                            
ExecutionConfigOptions.CharPrecisionEnforcer.IGNORE.name());
+                            TABLE_EXEC_SINK_CHAR_LENGTH_ENFORCER.key(),
+                            
ExecutionConfigOptions.CharLengthEnforcer.IGNORE.name());
         }
     }
 
@@ -378,7 +378,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         return builder.build();
     }
 
-    private static Schema schemaForCharPrecisionEnforcer() {
+    private static Schema schemaForCharLengthEnforcer() {
         return Schema.newBuilder()
                 .column("a", "INT")
                 .column("b", "CHAR(8)")
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
index 4755007..7822933 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/ConstraintEnforcer.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.NotNullEnforcer;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -37,7 +38,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
-import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.CharPrecisionEnforcer;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.CharLengthEnforcer;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -46,8 +47,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *
  * <ul>
  *   <li>{@code NOT NULL} column constraint of a sink table
- *   <li>{@code CHAR(precision)}/@{code VARCHAR(precision)}: trim string 
values to comply with the
- *       {@code precision} defined in their corresponding types.
+ *   <li>{@code CHAR(length)}/@{code VARCHAR(length)}: trim string values to 
comply with the {@code
+ *       length} defined in their corresponding types.
  * </ul>
  */
 @Internal
@@ -60,9 +61,9 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
     private final int[] notNullFieldIndices;
     private final String[] allFieldNames;
 
-    private final CharPrecisionEnforcer charPrecisionEnforcer;
+    private final ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer;
     private final int[] charFieldIndices;
-    private final int[] charFieldPrecisions;
+    private final int[] charFieldLengths;
     private final BitSet charFieldShouldPad;
 
     private final String operatorName;
@@ -72,17 +73,17 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
     private ConstraintEnforcer(
             NotNullEnforcer notNullEnforcer,
             int[] notNullFieldIndices,
-            CharPrecisionEnforcer charPrecisionEnforcer,
+            ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
             int[] charFieldIndices,
-            int[] charFieldPrecisions,
+            int[] charFieldLengths,
             BitSet charFieldShouldPad,
             String[] allFieldNames,
             String operatorName) {
         this.notNullEnforcer = notNullEnforcer;
         this.notNullFieldIndices = notNullFieldIndices;
-        this.charPrecisionEnforcer = charPrecisionEnforcer;
+        this.charLengthEnforcer = charLengthEnforcer;
         this.charFieldIndices = charFieldIndices;
-        this.charFieldPrecisions = charFieldPrecisions;
+        this.charFieldLengths = charFieldLengths;
         this.charFieldShouldPad = charFieldShouldPad;
         this.allFieldNames = allFieldNames;
         this.operatorName = operatorName;
@@ -105,14 +106,14 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
 
     /**
      * Helper builder, so that the {@link ConstraintEnforcer} can be 
instantiated with only the NOT
-     * NULL constraint validation, only the CHAR/VARCHAR precision validation, 
or both.
+     * NULL constraint validation, only the CHAR/VARCHAR length validation, or 
both.
      */
     public static class Builder {
 
         private NotNullEnforcer notNullEnforcer;
         private int[] notNullFieldIndices;
 
-        private CharPrecisionEnforcer charPrecisionEnforcer;
+        private CharLengthEnforcer charLengthEnforcer;
         private List<CharFieldInfo> charFieldInfo;
         private String[] allFieldNames;
 
@@ -140,13 +141,13 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
             }
         }
 
-        public void addCharPrecisionConstraint(
-                CharPrecisionEnforcer charPrecisionEnforcer,
+        public void addCharLengthConstraint(
+                ExecutionConfigOptions.CharLengthEnforcer charLengthEnforcer,
                 List<CharFieldInfo> charFieldInfo,
                 List<String> charFieldNames,
                 String[] allFieldNames) {
-            this.charPrecisionEnforcer = charPrecisionEnforcer;
-            if (this.charPrecisionEnforcer == CharPrecisionEnforcer.TRIM_PAD) {
+            this.charLengthEnforcer = charLengthEnforcer;
+            if (this.charLengthEnforcer == CharLengthEnforcer.TRIM_PAD) {
                 checkArgument(
                         charFieldInfo.size() > 0,
                         "ConstraintValidator requires that there are 
CHAR/VARCHAR fields.");
@@ -155,15 +156,14 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
 
                 operatorNames.add(
                         String.format(
-                                "CharPrecisionEnforcer(fields=[%s])",
+                                "CharLengthEnforcer(fields=[%s])",
                                 String.join(", ", charFieldNames)));
                 this.isConfigured = true;
             }
         }
 
         /**
-         * If neither of NOT NULL or CHAR/VARCHAR precision enforcers are 
configured, null is
-         * returned.
+         * If neither of NOT NULL or CHAR/VARCHAR length enforcers are 
configured, null is returned.
          */
         public ConstraintEnforcer build() {
             if (isConfigured) {
@@ -172,12 +172,12 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
                 return new ConstraintEnforcer(
                         notNullEnforcer,
                         notNullFieldIndices,
-                        charPrecisionEnforcer,
+                        charLengthEnforcer,
                         charFieldInfo != null
                                 ? charFieldInfo.stream().mapToInt(cfi -> 
cfi.fieldIdx).toArray()
                                 : null,
                         charFieldInfo != null
-                                ? charFieldInfo.stream().mapToInt(cfi -> 
cfi.precision).toArray()
+                                ? charFieldInfo.stream().mapToInt(cfi -> 
cfi.length).toArray()
                                 : null,
                         charFieldInfo != null ? buildShouldPad(charFieldInfo) 
: null,
                         allFieldNames,
@@ -231,8 +231,8 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
     }
 
     private RowData processCharConstraint(RowData rowData) {
-        if (charPrecisionEnforcer == null
-                || charPrecisionEnforcer == CharPrecisionEnforcer.IGNORE) {
+        if (charLengthEnforcer == null
+                || charLengthEnforcer == 
ExecutionConfigOptions.CharLengthEnforcer.IGNORE) {
             return rowData;
         }
 
@@ -240,26 +240,26 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
 
         for (int i = 0; i < charFieldIndices.length; i++) {
             final int fieldIdx = charFieldIndices[i];
-            final int precision = charFieldPrecisions[i];
+            final int length = charFieldLengths[i];
             final BinaryStringData stringData = (BinaryStringData) 
rowData.getString(fieldIdx);
             final int sourceStrLength = stringData.numChars();
 
-            if (charFieldShouldPad.get(i) && sourceStrLength < precision) {
+            if (charFieldShouldPad.get(i) && sourceStrLength < length) {
                 if (updatedRowData == null) {
                     updatedRowData = new UpdatableRowData(rowData, 
allFieldNames.length);
                 }
                 final int srcSizeInBytes = stringData.getSizeInBytes();
-                final byte[] newString = new byte[srcSizeInBytes + precision - 
sourceStrLength];
+                final byte[] newString = new byte[srcSizeInBytes + length - 
sourceStrLength];
                 for (int j = srcSizeInBytes; j < newString.length; j++) {
                     newString[j] = (byte) 32; // space
                 }
                 SegmentsUtil.copyToBytes(stringData.getSegments(), 0, 
newString, 0, srcSizeInBytes);
                 updatedRowData.setField(fieldIdx, 
StringData.fromBytes(newString));
-            } else if (sourceStrLength > precision) {
+            } else if (sourceStrLength > length) {
                 if (updatedRowData == null) {
                     updatedRowData = new UpdatableRowData(rowData, 
allFieldNames.length);
                 }
-                updatedRowData.setField(fieldIdx, stringData.substring(0, 
precision));
+                updatedRowData.setField(fieldIdx, stringData.substring(0, 
length));
             }
         }
 
@@ -273,12 +273,12 @@ public class ConstraintEnforcer extends 
TableStreamOperator<RowData>
     @Internal
     public static class CharFieldInfo {
         private final int fieldIdx;
-        private final Integer precision;
+        private final Integer length;
         private final boolean shouldPad;
 
-        public CharFieldInfo(int fieldIdx, @Nullable Integer precision, 
boolean shouldPad) {
+        public CharFieldInfo(int fieldIdx, @Nullable Integer length, boolean 
shouldPad) {
             this.fieldIdx = fieldIdx;
-            this.precision = precision;
+            this.length = length;
             this.shouldPad = shouldPad;
         }
 

Reply via email to