EricJoy2048 commented on code in PR #3230:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1008710151


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +209,39 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config 
pluginConfig,
-                                                                    
SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY)){
+    private Function<SeaTunnelRow, SeaTunnelRow> 
createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
+        if (CollectionUtils.isEmpty(this.partitionKeys)) {
             return row -> null;
         }
-        String partitionKey = pluginConfig.getString(PARTITION_KEY);
-        List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
-        if (!fieldNames.contains(partitionKey)) {
-            return row -> partitionKey;
-        }
-        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
         return row -> {
-            Object partitionFieldValue = row.getField(partitionFieldIndex);
-            if (partitionFieldValue != null) {
-                return partitionFieldValue.toString();
+            SeaTunnelRow keySeaTunnelRow = new 
SeaTunnelRow(this.partitionKeys.size());
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                String fieldName = seaTunnelRowType.getFieldNames()[i];
+                if (this.partitionKeys.contains(fieldName)) {
+                    int partitionFieldIndex = 
seaTunnelRowType.indexOf(fieldName);
+                    Object partitionFieldValue = 
row.getField(partitionFieldIndex);
+                    keySeaTunnelRow.setField(index, partitionFieldValue);
+                    ++index;
+                }
             }
-            return null;
+            return keySeaTunnelRow;
         };
     }
+
+    private List<String> createPartitionKeys(Config pluginConfig, 
SeaTunnelRowType seaTunnelRowType) {
+        if (pluginConfig.hasPath(PARTITION_KEY)) {
+            return pluginConfig.getStringList(PARTITION_KEY).stream()
+                    .filter(f -> {
+                        if 
(Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f)) {
+                            return true;
+                        } else {

Review Comment:
   `else` is redundant.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -162,11 +170,24 @@ private Properties getKafkaProperties(Config 
pluginConfig) {
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config 
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.hasPath(PARTITION)){
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition, 
seaTunnelRowType);
-        }
-        else {
-            return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+        if (pluginConfig.hasPath(PARTITION)) {
+            return new DefaultSeaTunnelRowSerializer(this.topic, 
this.partition, seaTunnelRowType);
+        } else if (CollectionUtils.isNotEmpty(this.partitionKeys)) {
+            int size = this.partitionKeys.size();
+            String[] keyFieldNames = new String[size];
+            SeaTunnelDataType<?>[] keyFieldTypes = new SeaTunnelDataType[size];
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                if 
(this.partitionKeys.contains(seaTunnelRowType.getFieldNames()[i])) {
+                    keyFieldNames[index] = seaTunnelRowType.getFieldName(i);
+                    keyFieldTypes[index] = seaTunnelRowType.getFieldType(i);
+                    ++index;
+                }
+            }
+            SeaTunnelRowType keySeaTunnelRowType = new 
SeaTunnelRowType(keyFieldNames, keyFieldTypes);
+            return new DefaultSeaTunnelRowSerializer(this.topic, 
keySeaTunnelRowType, seaTunnelRowType);
+        } else {

Review Comment:
   Suggest 
   ```
   if () {
      return xx;
   } else if () {
     return xx;
   } else {
     return xx;
   }
   ```
   
   replace to
   
   ```
   if () {
       return xx;
   }
   
   if () {
     return xx;
   }
   
   return xx;
   ```



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -51,7 +51,7 @@ In AT_LEAST_ONCE, producer will wait for all outstanding 
messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on 
the Kafka broker and messages may be duplicated.
 
-### partition_key [string]
+### partition_key [array]
 
 Configure which field is used as the key of the kafka message.
 

Review Comment:
   Please add `changed logs` reference 
https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/HdfsFile.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to