This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 228257b2e Fix the processing bug of abnormal parsing method of 
kafkaSource format. (#4687)
228257b2e is described below

commit 228257b2e256a83e18d3d5a9f9c34720223d65f9
Author: lightzhao <[email protected]>
AuthorDate: Fri Apr 28 13:38:41 2023 +0800

    Fix the processing bug of abnormal parsing method of kafkaSource format. 
(#4687)
---
 .../seatunnel/api/serialization/DeserializationSchema.java     | 10 +++-------
 .../connectors/seatunnel/kafka/source/KafkaSourceReader.java   |  4 ++--
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
index 745e517a2..d7f7abb1a 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -35,13 +35,9 @@ public interface DeserializationSchema<T> extends 
Serializable {
     T deserialize(byte[] message) throws IOException;
 
     default void deserialize(byte[] message, Collector<T> out) throws 
IOException {
-        try {
-            T deserialize = deserialize(message);
-            if (deserialize != null) {
-                out.collect(deserialize);
-            }
-        } catch (IOException e) {
-            throw new IOException(e);
+        T deserialize = deserialize(message);
+        if (deserialize != null) {
+            out.collect(deserialize);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 07fe71a60..226fded24 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -152,13 +152,13 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                                         try {
                                                             
deserializationSchema.deserialize(
                                                                     
record.value(), output);
-                                                        } catch (IOException 
e) {
+                                                        } catch (Exception e) {
                                                             if 
(this.messageFormatErrorHandleWay
                                                                     == 
MessageFormatErrorHandleWay
                                                                             
.SKIP) {
                                                                 log.warn(
                                                                         
"Deserialize message failed, skip this message, message: {}",
-                                                                        
record.value());
+                                                                        new 
String(record.value()));
                                                                 continue;
                                                             }
                                                             throw e;

Reply via email to