EricJoy2048 commented on code in PR #3230:
URL:
https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1008710151
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +209,39 @@ private void restoreState(List<KafkaSinkState> states) {
}
}
- private Function<SeaTunnelRow, String> createPartitionExtractor(Config
pluginConfig,
-
SeaTunnelRowType seaTunnelRowType) {
- if (!pluginConfig.hasPath(PARTITION_KEY)){
+ private Function<SeaTunnelRow, SeaTunnelRow>
createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
+ if (CollectionUtils.isEmpty(this.partitionKeys)) {
return row -> null;
}
- String partitionKey = pluginConfig.getString(PARTITION_KEY);
- List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
- if (!fieldNames.contains(partitionKey)) {
- return row -> partitionKey;
- }
- int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
return row -> {
- Object partitionFieldValue = row.getField(partitionFieldIndex);
- if (partitionFieldValue != null) {
- return partitionFieldValue.toString();
+ SeaTunnelRow keySeaTunnelRow = new
SeaTunnelRow(this.partitionKeys.size());
+ int index = 0;
+ for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+ String fieldName = seaTunnelRowType.getFieldNames()[i];
+ if (this.partitionKeys.contains(fieldName)) {
+ int partitionFieldIndex =
seaTunnelRowType.indexOf(fieldName);
+ Object partitionFieldValue =
row.getField(partitionFieldIndex);
+ keySeaTunnelRow.setField(index, partitionFieldValue);
+ ++index;
+ }
}
- return null;
+ return keySeaTunnelRow;
};
}
+
+ private List<String> createPartitionKeys(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
+ if (pluginConfig.hasPath(PARTITION_KEY)) {
+ return pluginConfig.getStringList(PARTITION_KEY).stream()
+ .filter(f -> {
+ if
(Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f)) {
+ return true;
+ } else {
Review Comment:
`else` is redundant.
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -162,11 +170,24 @@ private Properties getKafkaProperties(Config
pluginConfig) {
// todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- if (pluginConfig.hasPath(PARTITION)){
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition,
seaTunnelRowType);
- }
- else {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+ if (pluginConfig.hasPath(PARTITION)) {
+ return new DefaultSeaTunnelRowSerializer(this.topic,
this.partition, seaTunnelRowType);
+ } else if (CollectionUtils.isNotEmpty(this.partitionKeys)) {
+ int size = this.partitionKeys.size();
+ String[] keyFieldNames = new String[size];
+ SeaTunnelDataType<?>[] keyFieldTypes = new SeaTunnelDataType[size];
+ int index = 0;
+ for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+ if
(this.partitionKeys.contains(seaTunnelRowType.getFieldNames()[i])) {
+ keyFieldNames[index] = seaTunnelRowType.getFieldName(i);
+ keyFieldTypes[index] = seaTunnelRowType.getFieldType(i);
+ ++index;
+ }
+ }
+ SeaTunnelRowType keySeaTunnelRowType = new
SeaTunnelRowType(keyFieldNames, keyFieldTypes);
+ return new DefaultSeaTunnelRowSerializer(this.topic,
keySeaTunnelRowType, seaTunnelRowType);
+ } else {
Review Comment:
Suggest
```
if () {
return xx;
} else if () {
return xx;
} else {
return xx;
}
```
replace to
```
if () {
return xx;
}
if () {
return xx;
}
return xx;
```
##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -51,7 +51,7 @@ In AT_LEAST_ONCE, producer will wait for all outstanding
messages in the Kafka b
NON does not provide any guarantees: messages may be lost in case of issues on
the Kafka broker and messages may be duplicated.
-### partition_key [string]
+### partition_key [array]
Configure which field is used as the key of the kafka message.
Review Comment:
Please add `changed logs` reference
https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/HdfsFile.md
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]