Copilot commented on code in PR #4447:
URL: https://github.com/apache/flink-cdc/pull/4447#discussion_r3451048337


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java:
##########
@@ -194,6 +194,39 @@ void testToStarRocksTableWithProperties() {
         assertThat(table.getProperties()).containsEntry("replication_num", 
"3");
     }
 
+    @Test
+    void testToStarRocksTableWithUnicodeCharMaxBytes() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(17))
+                        .physicalColumn("code", DataTypes.CHAR(17))
+                        .primaryKey("id")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        // unicode-char.max-bytes = 4 to fit utf8mb4 characters
+        TableCreateConfig config = new TableCreateConfig(null, 
Collections.emptyMap(), 4);
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+
+        StarRocksColumn nameColumn =
+                table.getColumns().stream()
+                        .filter(c -> c.getColumnName().equals("name"))
+                        .findFirst()
+                        .get();

Review Comment:
   Using `findFirst().get()` will fail with a generic `NoSuchElementException`, 
which makes diagnosis harder when the test breaks. Prefer asserting presence 
(e.g., assert the Optional is present) or using `orElseThrow(...)` with a 
descriptive message so failures clearly indicate which column was missing.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java:
##########
@@ -44,9 +45,25 @@ public class TableCreateConfig implements Serializable {
     /** Properties for the table. */
     private final Map<String, String> properties;
 
+    /** Max bytes allocated for each upstream Unicode character during schema 
mapping. */
+    private final int unicodeCharMaxBytes;
+
     public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String> 
properties) {
+        this(
+                numBuckets,
+                properties,
+                
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+    }
+
+    public TableCreateConfig(
+            @Nullable Integer numBuckets, Map<String, String> properties, int 
unicodeCharMaxBytes) {
+        Preconditions.checkArgument(
+                unicodeCharMaxBytes > 0,
+                "unicode-char.max-bytes must be positive, but actually is %s",
+                unicodeCharMaxBytes);
         this.numBuckets = numBuckets;
         this.properties = new HashMap<>(properties);
+        this.unicodeCharMaxBytes = unicodeCharMaxBytes;

Review Comment:
   The validation only enforces `unicodeCharMaxBytes > 0`. Very large values 
can overflow `(long) length * unicodeCharMaxBytes` in 
`StarRocksUtils.CdcDataTypeTransformer` and potentially produce 
negative/invalid column sizes after casting. Please add an upper bound (ideally 
restrict to realistic encoding maxima like 4, or at least a safe limit based on 
`MAX_VARCHAR_SIZE / length`) and/or guard the multiplication with overflow-safe 
logic (e.g., `Math.multiplyExact` with a fallback cap).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to