DongLiang-0 commented on code in PR #35:
URL:
https://github.com/apache/doris-kafka-connector/pull/35#discussion_r1669663850
##########
src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java:
##########
@@ -113,25 +115,33 @@ public DorisOptions(Map<String, String> config) {
this.requestReadTimeoutMs =
Integer.parseInt(config.get(DorisSinkConnectorConfig.REQUEST_READ_TIMEOUT_MS));
}
- getStreamLoadPropFromConfig(config);
+ this.streamLoadProp = getStreamLoadPropFromConfig(config);
+ if (getStreamLoadProp().containsKey(LoadConstants.GROUP_COMMIT)) {
Review Comment:
Can the if judgment here be deleted? I see that this judgment logic already
exists in `ConfigCheckUtils.validateGroupCommitMode`
##########
src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java:
##########
@@ -293,4 +298,24 @@ private static boolean validateEnumInstances(String value,
String[] instances) {
}
return false;
}
+
+ public static boolean validateGroupCommitMode(Properties streamLoadProp,
boolean enable2PC) {
+ if (!streamLoadProp.containsKey(LoadConstants.GROUP_COMMIT)) {
+ return false;
+ }
+
+ Object value = streamLoadProp.get(LoadConstants.GROUP_COMMIT);
+ String normalizedValue = value.toString().trim().toLowerCase();
+ if (!GroupCommitMode.instances().contains(normalizedValue)) {
+ throw new DorisException("The value of group commit mode is an
illegal parameter.");
Review Comment:
```suggestion
throw new DorisException("The value of group commit mode is an
illegal parameter, illegal value=" + value);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]