Copilot commented on code in PR #4447:
URL: https://github.com/apache/flink-cdc/pull/4447#discussion_r3451048337
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java:
##########
@@ -194,6 +194,39 @@ void testToStarRocksTableWithProperties() {
assertThat(table.getProperties()).containsEntry("replication_num",
"3");
}
+ @Test
+ void testToStarRocksTableWithUnicodeCharMaxBytes() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(17))
+ .physicalColumn("code", DataTypes.CHAR(17))
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ // unicode-char.max-bytes = 4 to fit utf8mb4 characters
+ TableCreateConfig config = new TableCreateConfig(null,
Collections.emptyMap(), 4);
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+
+ StarRocksColumn nameColumn =
+ table.getColumns().stream()
+ .filter(c -> c.getColumnName().equals("name"))
+ .findFirst()
+ .get();
Review Comment:
Using `findFirst().get()` will fail with a generic `NoSuchElementException`,
which makes diagnosis harder when the test breaks. Prefer asserting presence
(e.g., assert the Optional is present) or using `orElseThrow(...)` with a
descriptive message so failures clearly indicate which column was missing.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/TableCreateConfig.java:
##########
@@ -44,9 +45,25 @@ public class TableCreateConfig implements Serializable {
/** Properties for the table. */
private final Map<String, String> properties;
+ /** Max bytes allocated for each upstream Unicode character during schema
mapping. */
+ private final int unicodeCharMaxBytes;
+
public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String>
properties) {
+ this(
+ numBuckets,
+ properties,
+
StarRocksDataSinkOptions.UNICODE_CHAR_MAX_BYTES.defaultValue());
+ }
+
+ public TableCreateConfig(
+ @Nullable Integer numBuckets, Map<String, String> properties, int
unicodeCharMaxBytes) {
+ Preconditions.checkArgument(
+ unicodeCharMaxBytes > 0,
+ "unicode-char.max-bytes must be positive, but actually is %s",
+ unicodeCharMaxBytes);
this.numBuckets = numBuckets;
this.properties = new HashMap<>(properties);
+ this.unicodeCharMaxBytes = unicodeCharMaxBytes;
Review Comment:
The validation only enforces `unicodeCharMaxBytes > 0`. Very large values
can overflow `(long) length * unicodeCharMaxBytes` in
`StarRocksUtils.CdcDataTypeTransformer` and potentially produce
negative/invalid column sizes after casting. Please add an upper bound (ideally
restrict to realistic encoding maxima like 4, or at least a safe limit based on
`MAX_VARCHAR_SIZE / length`) and/or guard the multiplication with overflow-safe
logic (e.g., `Math.multiplyExact` with a fallback cap).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]