This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a18397f8b00 [FLINK-38819][formats/json] Make Flink JSON 
deserialization schema log parsing errors (#27370)
a18397f8b00 is described below

commit a18397f8b00f39bbef8d416dab9b1fe65e2eefdd
Author: Santwana Verma <[email protected]>
AuthorDate: Wed Mar 25 07:00:09 2026 +0530

    [FLINK-38819][formats/json] Make Flink JSON deserialization schema log 
parsing errors (#27370)
---
 .../json/AbstractJsonDeserializationSchema.java    | 18 +++++++
 .../JsonParserRowDataDeserializationSchema.java    |  1 +
 .../json/JsonRowDataDeserializationSchema.java     |  2 +
 .../json/canal/CanalJsonDeserializationSchema.java | 14 +++++
 .../DebeziumJsonDeserializationSchema.java         | 15 ++++++
 .../maxwell/MaxwellJsonDeserializationSchema.java  | 15 ++++++
 .../json/ogg/OggJsonDeserializationSchema.java     | 14 +++++
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 63 ++++++++++++++++++++++
 .../json/canal/CanalJsonSerDeSchemaTest.java       | 60 +++++++++++++++++++++
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 35 ++++++++++++
 .../json/maxwell/MaxwellJsonSerDerTest.java        | 34 ++++++++++++
 .../formats/json/ogg/OggJsonSerDeSchemaTest.java   | 34 ++++++++++++
 12 files changed, 305 insertions(+)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
index 88c5e9d4c1d..769e721a14d 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
@@ -34,6 +34,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonRead
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -53,6 +56,9 @@ public abstract class AbstractJsonDeserializationSchema 
implements Deserializati
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractJsonDeserializationSchema.class);
+
     /** Flag indicating whether to fail if a field is missing. */
     protected final boolean failOnMissingField;
 
@@ -155,4 +161,16 @@ public abstract class AbstractJsonDeserializationSchema 
implements Deserializati
     public int hashCode() {
         return Objects.hash(failOnMissingField, ignoreParseErrors, 
resultTypeInfo, timestampFormat);
     }
+
+    /**
+     * Logs a debug message for parsing errors only when debug logs are 
enabled.
+     *
+     * @param message the original JSON message that failed to parse
+     * @param t the throwable that was caught
+     */
+    protected void logParseErrorIfDebugEnabled(byte[] message, Throwable t) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to deserialize JSON '{}'.", new String(message), 
t);
+        }
+    }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
index eea1f607f1c..8a21962ad7d 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
@@ -100,6 +100,7 @@ public class JsonParserRowDataDeserializationSchema extends 
AbstractJsonDeserial
                 throw new IOException(
                         format("Failed to deserialize JSON '%s'.", new 
String(message)), t);
             }
+            logParseErrorIfDebugEnabled(message, t);
         }
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index e8f76eca52a..a34d4f8d9b6 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -84,6 +84,7 @@ public class JsonRowDataDeserializationSchema extends 
AbstractJsonDeserializatio
                             // will be caught by outer try-catch
                             throw t;
                         }
+                        logParseErrorIfDebugEnabled(message, t);
                     }
                 }
             } else {
@@ -97,6 +98,7 @@ public class JsonRowDataDeserializationSchema extends 
AbstractJsonDeserializatio
                 throw new IOException(
                         format("Failed to deserialize JSON '%s'.", new 
String(message)), t);
             }
+            logParseErrorIfDebugEnabled(message, t);
         }
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
index 5f4594343a7..a4c797f7a48 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -36,6 +36,9 @@ import org.apache.flink.util.Collector;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -62,6 +65,8 @@ import static java.lang.String.format;
 public final class CanalJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CanalJsonDeserializationSchema.class);
+
     private static final String FIELD_OLD = "old";
     private static final String OP_INSERT = "INSERT";
     private static final String OP_UPDATE = "UPDATE";
@@ -288,6 +293,12 @@ public final class CanalJsonDeserializationSchema 
implements DeserializationSche
                                     "Unknown \"type\" value \"%s\". The Canal 
JSON message is '%s'",
                                     type, new String(message)));
                 }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Unknown \"type\" value '{}'. The Canal JSON 
message is '{}'.",
+                            type,
+                            new String(message));
+                }
             }
         } catch (Throwable t) {
             // a big try catch to protect the processing.
@@ -295,6 +306,9 @@ public final class CanalJsonDeserializationSchema 
implements DeserializationSche
                 throw new IOException(
                         format("Corrupt Canal JSON message '%s'.", new 
String(message)), t);
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Corrupt Canal JSON message '{}'.", new 
String(message), t);
+            }
         }
         for (GenericRowData genericRowData : genericRowDataList) {
             out.collect(genericRowData);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
index 774e271cd0a..9db88209aeb 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -33,6 +33,9 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -57,6 +60,9 @@ import static java.lang.String.format;
 public final class DebeziumJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DebeziumJsonDeserializationSchema.class);
+
     private static final String OP_READ = "r"; // snapshot read
     private static final String OP_CREATE = "c"; // insert
     private static final String OP_UPDATE = "u"; // update
@@ -176,6 +182,12 @@ public final class DebeziumJsonDeserializationSchema 
implements DeserializationS
                                     "Unknown \"op\" value \"%s\". The Debezium 
JSON message is '%s'",
                                     op, new String(message)));
                 }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Unknown \"op\" value '{}'. The Debezium JSON 
message is '{}'.",
+                            op,
+                            new String(message));
+                }
             }
         } catch (Throwable t) {
             // a big try catch to protect the processing.
@@ -183,6 +195,9 @@ public final class DebeziumJsonDeserializationSchema 
implements DeserializationS
                 throw new IOException(
                         format("Corrupt Debezium JSON message '%s'.", new 
String(message)), t);
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Corrupt Debezium JSON message '{}'.", new 
String(message), t);
+            }
         }
         for (GenericRowData genericRowData : genericRowDataList) {
             out.collect(genericRowData);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
index 0a31d6a9545..5e369a380a8 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
@@ -34,6 +34,9 @@ import org.apache.flink.util.Collector;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -57,6 +60,9 @@ import static java.lang.String.format;
 public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 2L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MaxwellJsonDeserializationSchema.class);
+
     private static final String FIELD_OLD = "old";
     private static final String OP_INSERT = "insert";
     private static final String OP_UPDATE = "update";
@@ -171,6 +177,12 @@ public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<R
                                     "Unknown \"type\" value \"%s\". The 
Maxwell JSON message is '%s'",
                                     type, new String(message)));
                 }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Unknown \"type\" value '{}'. The Maxwell JSON 
message is '{}'.",
+                            type,
+                            new String(message));
+                }
             }
         } catch (Throwable t) {
             // a big try catch to protect the processing.
@@ -178,6 +190,9 @@ public class MaxwellJsonDeserializationSchema implements 
DeserializationSchema<R
                 throw new IOException(
                         format("Corrupt Maxwell JSON message '%s'.", new 
String(message)), t);
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Corrupt Maxwell JSON message '{}'.", new 
String(message), t);
+            }
         }
         for (GenericRowData genericRowData : genericRowDataList) {
             out.collect(genericRowData);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
index 8fc5ca9fc39..d6351f61c39 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
@@ -33,6 +33,9 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -57,6 +60,8 @@ import static java.lang.String.format;
 public final class OggJsonDeserializationSchema implements 
DeserializationSchema<RowData> {
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(OggJsonDeserializationSchema.class);
+
     private static final String OP_CREATE = "I"; // insert
     private static final String OP_UPDATE = "U"; // update
     private static final String OP_DELETE = "D"; // delete
@@ -198,6 +203,12 @@ public final class OggJsonDeserializationSchema implements 
DeserializationSchema
                                     "Unknown \"op_type\" value \"%s\". The Ogg 
JSON message is '%s'",
                                     op, new String(message)));
                 }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Unknown \"op_type\" value '{}'. The Ogg JSON 
message is '{}'.",
+                            op,
+                            new String(message));
+                }
             }
         } catch (Throwable t) {
             // a big try catch to protect the processing.
@@ -205,6 +216,9 @@ public final class OggJsonDeserializationSchema implements 
DeserializationSchema
                 throw new IOException(
                         format("Corrupt Ogg JSON message '%s'.", new 
String(message)), t);
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Corrupt Ogg JSON message '{}'.", new 
String(message), t);
+            }
         }
         for (GenericRowData genericRowData : genericRowDataList) {
             out.collect(genericRowData);
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 0012ede7344..fc320bc1696 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -45,7 +46,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Instant;
@@ -87,6 +90,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
 
 /**
  * Tests for {@link JsonRowDataDeserializationSchema}, {@link
@@ -97,6 +101,10 @@ public class JsonRowDataSerDeSchemaTest {
 
     private static final ObjectMapper OBJECT_MAPPER = 
JacksonMapperFactory.createObjectMapper();
 
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new 
LoggerAuditingExtension(AbstractJsonDeserializationSchema.class, DEBUG);
+
     @Parameter public boolean isJsonParser;
 
     @Parameters(name = "isJsonParser={0}")
@@ -843,6 +851,61 @@ public class JsonRowDataSerDeSchemaTest {
                 .hasMessageContaining(spec.errorMessage);
     }
 
+    @Test
+    void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+        RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+        String invalidJson = "not valid json";
+
+        DeserializationSchema<RowData> schema =
+                new JsonRowDataDeserializationSchema(
+                        rowType, InternalTypeInfo.of(rowType), false, true, 
TimestampFormat.SQL);
+        schema.open(new DummyInitializationContext());
+
+        // Deserialize invalid JSON - should not throw but should log
+        RowData result = schema.deserialize(invalidJson.getBytes());
+
+        // Result should be null since parsing failed
+        assertThat(result).isNull();
+
+        // Verify the error was logged at DEBUG level
+        assertThat(loggerExtension.getMessages())
+                .anyMatch(
+                        msg ->
+                                msg.contains("Failed to deserialize JSON")
+                                        && msg.contains(invalidJson));
+    }
+
+    @TestTemplate
+    void testIgnoreParseErrorsFieldReturnsNull() throws Exception {
+        RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+        String invalidFieldJson = "{\"id\":\"not_a_number\"}";
+
+        DeserializationSchema<RowData> schema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, true, 
TimestampFormat.SQL);
+        schema.open(new DummyInitializationContext());
+
+        // With ignoreParseErrors=true, invalid field values should be set to 
null
+        RowData result = schema.deserialize(invalidFieldJson.getBytes());
+        assertThat(result).isNotNull();
+        assertThat(result.isNullAt(0)).isTrue();
+    }
+
+    @TestTemplate
+    void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse() 
throws Exception {
+        RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+        String invalidFieldJson = "{\"id\":\"not_a_number\"}";
+
+        DeserializationSchema<RowData> schema =
+                createDeserializationSchema(
+                        isJsonParser, rowType, false, false, 
TimestampFormat.SQL);
+        schema.open(new DummyInitializationContext());
+
+        assertThatThrownBy(() -> 
schema.deserialize(invalidFieldJson.getBytes()))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Failed to deserialize JSON");
+    }
+
     private static List<TestSpec> testData =
             Arrays.asList(
                     TestSpec.json("{\"id\": \"trueA\"}")
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index 74fc6648900..e2ac3398bee 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -28,9 +28,11 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.util.Collector;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,10 +56,15 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
 
 /** Tests for {@link CanalJsonSerializationSchema} and {@link 
CanalJsonDeserializationSchema}. */
 class CanalJsonSerDeSchemaTest {
 
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new LoggerAuditingExtension(CanalJsonDeserializationSchema.class, 
DEBUG);
+
     private static final DataType PHYSICAL_DATA_TYPE =
             ROW(
                     FIELD("id", INT().notNull()),
@@ -105,6 +112,59 @@ class CanalJsonSerDeSchemaTest {
                                 "An error occurred while collecting data."));
     }
 
+    @Test
+    void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+        String corruptMessage =
+                
"{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\",\"database\":\"test\",\"table\":\"test\"}";
+        CanalJsonDeserializationSchema deserializationSchema =
+                CanalJsonDeserializationSchema.builder(
+                                PHYSICAL_DATA_TYPE,
+                                Collections.emptyList(),
+                                
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
+                        .setIgnoreParseErrors(true)
+                        .build();
+        open(deserializationSchema);
+
+        SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(
+                corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+        // Verify no records were collected
+        assertThat(collector.list).isEmpty();
+
+        // Verify the error was logged at DEBUG level
+        assertThat(loggerExtension.getMessages())
+                .anyMatch(
+                        msg ->
+                                msg.contains("Unknown \"type\" value")
+                                        || msg.contains("Corrupt Canal JSON"));
+    }
+
+    @Test
+    void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse() {
+        String corruptMessage = "{\"type\":\"INVALID\"}";
+
+        CanalJsonDeserializationSchema deserializationSchema =
+                CanalJsonDeserializationSchema.builder(
+                                PHYSICAL_DATA_TYPE,
+                                Collections.emptyList(),
+                                
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
+                        .setIgnoreParseErrors(false)
+                        .build();
+        open(deserializationSchema);
+
+        SimpleCollector collector = new SimpleCollector();
+
+        assertThatThrownBy(
+                        () ->
+                                deserializationSchema.deserialize(
+                                        
corruptMessage.getBytes(StandardCharsets.UTF_8), collector))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Corrupt Canal JSON message")
+                .cause()
+                .hasMessageContaining("Unknown \"type\" value");
+    }
+
     @Test
     void testDeserializeNullRow() throws Exception {
         final List<ReadableMetadata> requestedMetadata = 
Arrays.asList(ReadableMetadata.values());
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 6c207c78bcf..3252c090f59 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -27,9 +27,11 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.util.Collector;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
@@ -53,12 +55,17 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
 
 /**
  * Tests for {@link DebeziumJsonSerializationSchema} and {@link 
DebeziumJsonDeserializationSchema}.
  */
 class DebeziumJsonSerDeSchemaTest {
 
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new 
LoggerAuditingExtension(DebeziumJsonDeserializationSchema.class, DEBUG);
+
     private static final DataType PHYSICAL_DATA_TYPE =
             ROW(
                     FIELD("id", INT().notNull()),
@@ -142,6 +149,34 @@ class DebeziumJsonSerDeSchemaTest {
                                 "An error occurred while collecting data."));
     }
 
+    @Test
+    void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+        String corruptMessage = 
"{\"before\":null,\"after\":null,\"op\":\"UNKNOWN_OP\"}";
+        DebeziumJsonDeserializationSchema deserializationSchema =
+                new DebeziumJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        Collections.emptyList(),
+                        
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+                        false,
+                        true, // ignoreParseErrors
+                        TimestampFormat.ISO_8601);
+        open(deserializationSchema);
+
+        SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(
+                corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+        // Verify no records were collected
+        assertThat(collector.list).isEmpty();
+
+        // Verify the error was logged at DEBUG level
+        assertThat(loggerExtension.getMessages())
+                .anyMatch(
+                        msg ->
+                                msg.contains("Unknown \"op\" value")
+                                        || msg.contains("Corrupt Debezium 
JSON"));
+    }
+
     @Test
     void testDeserializationWithMetadata() throws Exception {
         testDeserializationWithMetadata(
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index d0fbb2d49ce..a25fc6551cb 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -27,9 +27,11 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.util.Collector;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,12 +54,17 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
 
 /**
  * Tests for {@link MaxwellJsonSerializationSchema} and {@link 
MaxwellJsonDeserializationSchema}.
  */
 class MaxwellJsonSerDerTest {
 
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new 
LoggerAuditingExtension(MaxwellJsonDeserializationSchema.class, DEBUG);
+
     private static final DataType PHYSICAL_DATA_TYPE =
             ROW(
                     FIELD("id", INT().notNull()),
@@ -130,6 +137,33 @@ class MaxwellJsonSerDerTest {
                                 "An error occurred while collecting data."));
     }
 
+    @Test
+    void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+        String corruptMessage = 
"{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\"}";
+        MaxwellJsonDeserializationSchema deserializationSchema =
+                new MaxwellJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        Collections.emptyList(),
+                        
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+                        true, // ignoreParseErrors
+                        TimestampFormat.ISO_8601);
+        open(deserializationSchema);
+
+        SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(
+                corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+        // Verify no records were collected
+        assertThat(collector.list).isEmpty();
+
+        // Verify the error was logged at DEBUG level
+        assertThat(loggerExtension.getMessages())
+                .anyMatch(
+                        msg ->
+                                msg.contains("Unknown \"type\" value")
+                                        || msg.contains("Corrupt Maxwell 
JSON"));
+    }
+
     @Test
     void testSerializationDeserialization() throws Exception {
         List<String> lines = readLines("maxwell-data.txt");
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
index 00cdf512ef6..b5a81e40d37 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -27,10 +27,12 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 import org.apache.flink.util.Collector;
 
 import org.assertj.core.data.Percentage;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.File;
 import java.io.IOException;
@@ -53,10 +55,15 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
 
 /** Tests for {@link OggJsonSerializationSchema} and {@link 
OggJsonDeserializationSchema}. */
 class OggJsonSerDeSchemaTest {
 
+    @RegisterExtension
+    public final LoggerAuditingExtension loggerExtension =
+            new LoggerAuditingExtension(OggJsonDeserializationSchema.class, 
DEBUG);
+
     private static final DataType PHYSICAL_DATA_TYPE =
             ROW(
                     FIELD("id", INT().notNull()),
@@ -108,6 +115,33 @@ class OggJsonSerDeSchemaTest {
                                 "An error occurred while collecting data."));
     }
 
+    @Test
+    void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+        String corruptMessage = 
"{\"before\":null,\"after\":null,\"op_type\":\"UNKNOWN_OP\"}";
+        OggJsonDeserializationSchema deserializationSchema =
+                new OggJsonDeserializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        Collections.emptyList(),
+                        
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+                        true, // ignoreParseErrors
+                        TimestampFormat.ISO_8601);
+        open(deserializationSchema);
+
+        SimpleCollector collector = new SimpleCollector();
+        deserializationSchema.deserialize(
+                corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+        // Verify no records were collected
+        assertThat(collector.list).isEmpty();
+
+        // Verify the error was logged at DEBUG level
+        assertThat(loggerExtension.getMessages())
+                .anyMatch(
+                        msg ->
+                                msg.contains("Unknown \"op_type\" value")
+                                        || msg.contains("Corrupt Ogg JSON"));
+    }
+
     @Test
     void testTombstoneMessages() throws Exception {
         OggJsonDeserializationSchema deserializationSchema =

Reply via email to