This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a18397f8b00 [FLINK-38819][formats/json] Make Flink JSON
deserialization schema log parsing errors (#27370)
a18397f8b00 is described below
commit a18397f8b00f39bbef8d416dab9b1fe65e2eefdd
Author: Santwana Verma <[email protected]>
AuthorDate: Wed Mar 25 07:00:09 2026 +0530
[FLINK-38819][formats/json] Make Flink JSON deserialization schema log
parsing errors (#27370)
---
.../json/AbstractJsonDeserializationSchema.java | 18 +++++++
.../JsonParserRowDataDeserializationSchema.java | 1 +
.../json/JsonRowDataDeserializationSchema.java | 2 +
.../json/canal/CanalJsonDeserializationSchema.java | 14 +++++
.../DebeziumJsonDeserializationSchema.java | 15 ++++++
.../maxwell/MaxwellJsonDeserializationSchema.java | 15 ++++++
.../json/ogg/OggJsonDeserializationSchema.java | 14 +++++
.../formats/json/JsonRowDataSerDeSchemaTest.java | 63 ++++++++++++++++++++++
.../json/canal/CanalJsonSerDeSchemaTest.java | 60 +++++++++++++++++++++
.../json/debezium/DebeziumJsonSerDeSchemaTest.java | 35 ++++++++++++
.../json/maxwell/MaxwellJsonSerDerTest.java | 34 ++++++++++++
.../formats/json/ogg/OggJsonSerDeSchemaTest.java | 34 ++++++++++++
12 files changed, 305 insertions(+)
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
index 88c5e9d4c1d..769e721a14d 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
@@ -34,6 +34,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonRead
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -53,6 +56,9 @@ public abstract class AbstractJsonDeserializationSchema
implements Deserializati
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractJsonDeserializationSchema.class);
+
/** Flag indicating whether to fail if a field is missing. */
protected final boolean failOnMissingField;
@@ -155,4 +161,16 @@ public abstract class AbstractJsonDeserializationSchema
implements Deserializati
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors,
resultTypeInfo, timestampFormat);
}
+
+ /**
+ * Logs a debug message for parsing errors only when debug logs are
enabled.
+ *
+ * @param message the original JSON message that failed to parse
+ * @param t the throwable that was caught
+ */
+ protected void logParseErrorIfDebugEnabled(byte[] message, Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to deserialize JSON '{}'.", new String(message),
t);
+ }
+ }
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
index eea1f607f1c..8a21962ad7d 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
@@ -100,6 +100,7 @@ public class JsonParserRowDataDeserializationSchema extends
AbstractJsonDeserial
throw new IOException(
format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
+ logParseErrorIfDebugEnabled(message, t);
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index e8f76eca52a..a34d4f8d9b6 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -84,6 +84,7 @@ public class JsonRowDataDeserializationSchema extends
AbstractJsonDeserializatio
// will be caught by outer try-catch
throw t;
}
+ logParseErrorIfDebugEnabled(message, t);
}
}
} else {
@@ -97,6 +98,7 @@ public class JsonRowDataDeserializationSchema extends
AbstractJsonDeserializatio
throw new IOException(
format("Failed to deserialize JSON '%s'.", new
String(message)), t);
}
+ logParseErrorIfDebugEnabled(message, t);
}
}
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
index 5f4594343a7..a4c797f7a48 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -36,6 +36,9 @@ import org.apache.flink.util.Collector;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -62,6 +65,8 @@ import static java.lang.String.format;
public final class CanalJsonDeserializationSchema implements
DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(CanalJsonDeserializationSchema.class);
+
private static final String FIELD_OLD = "old";
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
@@ -288,6 +293,12 @@ public final class CanalJsonDeserializationSchema
implements DeserializationSche
"Unknown \"type\" value \"%s\". The Canal
JSON message is '%s'",
type, new String(message)));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Unknown \"type\" value '{}'. The Canal JSON
message is '{}'.",
+ type,
+ new String(message));
+ }
}
} catch (Throwable t) {
// a big try catch to protect the processing.
@@ -295,6 +306,9 @@ public final class CanalJsonDeserializationSchema
implements DeserializationSche
throw new IOException(
format("Corrupt Canal JSON message '%s'.", new
String(message)), t);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Corrupt Canal JSON message '{}'.", new
String(message), t);
+ }
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
index 774e271cd0a..9db88209aeb 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -33,6 +33,9 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -57,6 +60,9 @@ import static java.lang.String.format;
public final class DebeziumJsonDeserializationSchema implements
DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DebeziumJsonDeserializationSchema.class);
+
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
@@ -176,6 +182,12 @@ public final class DebeziumJsonDeserializationSchema
implements DeserializationS
"Unknown \"op\" value \"%s\". The Debezium
JSON message is '%s'",
op, new String(message)));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Unknown \"op\" value '{}'. The Debezium JSON
message is '{}'.",
+ op,
+ new String(message));
+ }
}
} catch (Throwable t) {
// a big try catch to protect the processing.
@@ -183,6 +195,9 @@ public final class DebeziumJsonDeserializationSchema
implements DeserializationS
throw new IOException(
format("Corrupt Debezium JSON message '%s'.", new
String(message)), t);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Corrupt Debezium JSON message '{}'.", new
String(message), t);
+ }
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
index 0a31d6a9545..5e369a380a8 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
@@ -34,6 +34,9 @@ import org.apache.flink.util.Collector;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -57,6 +60,9 @@ import static java.lang.String.format;
public class MaxwellJsonDeserializationSchema implements
DeserializationSchema<RowData> {
private static final long serialVersionUID = 2L;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MaxwellJsonDeserializationSchema.class);
+
private static final String FIELD_OLD = "old";
private static final String OP_INSERT = "insert";
private static final String OP_UPDATE = "update";
@@ -171,6 +177,12 @@ public class MaxwellJsonDeserializationSchema implements
DeserializationSchema<R
"Unknown \"type\" value \"%s\". The
Maxwell JSON message is '%s'",
type, new String(message)));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Unknown \"type\" value '{}'. The Maxwell JSON
message is '{}'.",
+ type,
+ new String(message));
+ }
}
} catch (Throwable t) {
// a big try catch to protect the processing.
@@ -178,6 +190,9 @@ public class MaxwellJsonDeserializationSchema implements
DeserializationSchema<R
throw new IOException(
format("Corrupt Maxwell JSON message '%s'.", new
String(message)), t);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Corrupt Maxwell JSON message '{}'.", new
String(message), t);
+ }
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
index 8fc5ca9fc39..d6351f61c39 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java
@@ -33,6 +33,9 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -57,6 +60,8 @@ import static java.lang.String.format;
public final class OggJsonDeserializationSchema implements
DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(OggJsonDeserializationSchema.class);
+
private static final String OP_CREATE = "I"; // insert
private static final String OP_UPDATE = "U"; // update
private static final String OP_DELETE = "D"; // delete
@@ -198,6 +203,12 @@ public final class OggJsonDeserializationSchema implements
DeserializationSchema
"Unknown \"op_type\" value \"%s\". The Ogg
JSON message is '%s'",
op, new String(message)));
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Unknown \"op_type\" value '{}'. The Ogg JSON
message is '{}'.",
+ op,
+ new String(message));
+ }
}
} catch (Throwable t) {
// a big try catch to protect the processing.
@@ -205,6 +216,9 @@ public final class OggJsonDeserializationSchema implements
DeserializationSchema
throw new IOException(
format("Corrupt Ogg JSON message '%s'.", new
String(message)), t);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Corrupt Ogg JSON message '{}'.", new
String(message), t);
+ }
}
for (GenericRowData genericRowData : genericRowDataList) {
out.collect(genericRowData);
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 0012ede7344..fc320bc1696 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -45,7 +46,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
@@ -87,6 +90,7 @@ import static org.apache.flink.table.api.DataTypes.TINYINT;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
/**
* Tests for {@link JsonRowDataDeserializationSchema}, {@link
@@ -97,6 +101,10 @@ public class JsonRowDataSerDeSchemaTest {
private static final ObjectMapper OBJECT_MAPPER =
JacksonMapperFactory.createObjectMapper();
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new
LoggerAuditingExtension(AbstractJsonDeserializationSchema.class, DEBUG);
+
@Parameter public boolean isJsonParser;
@Parameters(name = "isJsonParser={0}")
@@ -843,6 +851,61 @@ public class JsonRowDataSerDeSchemaTest {
.hasMessageContaining(spec.errorMessage);
}
+ @Test
+ void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+ RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+ String invalidJson = "not valid json";
+
+ DeserializationSchema<RowData> schema =
+ new JsonRowDataDeserializationSchema(
+ rowType, InternalTypeInfo.of(rowType), false, true,
TimestampFormat.SQL);
+ schema.open(new DummyInitializationContext());
+
+ // Deserialize invalid JSON - should not throw but should log
+ RowData result = schema.deserialize(invalidJson.getBytes());
+
+ // Result should be null since parsing failed
+ assertThat(result).isNull();
+
+ // Verify the error was logged at DEBUG level
+ assertThat(loggerExtension.getMessages())
+ .anyMatch(
+ msg ->
+ msg.contains("Failed to deserialize JSON")
+ && msg.contains(invalidJson));
+ }
+
+ @TestTemplate
+ void testIgnoreParseErrorsFieldReturnsNull() throws Exception {
+ RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+ String invalidFieldJson = "{\"id\":\"not_a_number\"}";
+
+ DeserializationSchema<RowData> schema =
+ createDeserializationSchema(
+ isJsonParser, rowType, false, true,
TimestampFormat.SQL);
+ schema.open(new DummyInitializationContext());
+
+ // With ignoreParseErrors=true, invalid field values should be set to
null
+ RowData result = schema.deserialize(invalidFieldJson.getBytes());
+ assertThat(result).isNotNull();
+ assertThat(result.isNullAt(0)).isTrue();
+ }
+
+ @TestTemplate
+ void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse()
throws Exception {
+ RowType rowType = (RowType) ROW(FIELD("id", INT())).getLogicalType();
+ String invalidFieldJson = "{\"id\":\"not_a_number\"}";
+
+ DeserializationSchema<RowData> schema =
+ createDeserializationSchema(
+ isJsonParser, rowType, false, false,
TimestampFormat.SQL);
+ schema.open(new DummyInitializationContext());
+
+ assertThatThrownBy(() ->
schema.deserialize(invalidFieldJson.getBytes()))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Failed to deserialize JSON");
+ }
+
private static List<TestSpec> testData =
Arrays.asList(
TestSpec.json("{\"id\": \"trueA\"}")
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index 74fc6648900..e2ac3398bee 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -28,9 +28,11 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.File;
import java.io.IOException;
@@ -54,10 +56,15 @@ import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
/** Tests for {@link CanalJsonSerializationSchema} and {@link
CanalJsonDeserializationSchema}. */
class CanalJsonSerDeSchemaTest {
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new LoggerAuditingExtension(CanalJsonDeserializationSchema.class,
DEBUG);
+
private static final DataType PHYSICAL_DATA_TYPE =
ROW(
FIELD("id", INT().notNull()),
@@ -105,6 +112,59 @@ class CanalJsonSerDeSchemaTest {
"An error occurred while collecting data."));
}
+ @Test
+ void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+ String corruptMessage =
+
"{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\",\"database\":\"test\",\"table\":\"test\"}";
+ CanalJsonDeserializationSchema deserializationSchema =
+ CanalJsonDeserializationSchema.builder(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
+ .setIgnoreParseErrors(true)
+ .build();
+ open(deserializationSchema);
+
+ SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(
+ corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+ // Verify no records were collected
+ assertThat(collector.list).isEmpty();
+
+ // Verify the error was logged at DEBUG level
+ assertThat(loggerExtension.getMessages())
+ .anyMatch(
+ msg ->
+ msg.contains("Unknown \"type\" value")
+ || msg.contains("Corrupt Canal JSON"));
+ }
+
+ @Test
+ void testDeserializationThrowsExceptionWhenIgnoreParseErrorsIsFalse() {
+ String corruptMessage = "{\"type\":\"INVALID\"}";
+
+ CanalJsonDeserializationSchema deserializationSchema =
+ CanalJsonDeserializationSchema.builder(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
+ .setIgnoreParseErrors(false)
+ .build();
+ open(deserializationSchema);
+
+ SimpleCollector collector = new SimpleCollector();
+
+ assertThatThrownBy(
+ () ->
+ deserializationSchema.deserialize(
+
corruptMessage.getBytes(StandardCharsets.UTF_8), collector))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Corrupt Canal JSON message")
+ .cause()
+ .hasMessageContaining("Unknown \"type\" value");
+ }
+
@Test
void testDeserializeNullRow() throws Exception {
final List<ReadableMetadata> requestedMetadata =
Arrays.asList(ReadableMetadata.values());
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 6c207c78bcf..3252c090f59 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -27,9 +27,11 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.File;
import java.io.IOException;
@@ -53,12 +55,17 @@ import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
/**
* Tests for {@link DebeziumJsonSerializationSchema} and {@link
DebeziumJsonDeserializationSchema}.
*/
class DebeziumJsonSerDeSchemaTest {
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new
LoggerAuditingExtension(DebeziumJsonDeserializationSchema.class, DEBUG);
+
private static final DataType PHYSICAL_DATA_TYPE =
ROW(
FIELD("id", INT().notNull()),
@@ -142,6 +149,34 @@ class DebeziumJsonSerDeSchemaTest {
"An error occurred while collecting data."));
}
+ @Test
+ void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+ String corruptMessage =
"{\"before\":null,\"after\":null,\"op\":\"UNKNOWN_OP\"}";
+ DebeziumJsonDeserializationSchema deserializationSchema =
+ new DebeziumJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+ false,
+ true, // ignoreParseErrors
+ TimestampFormat.ISO_8601);
+ open(deserializationSchema);
+
+ SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(
+ corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+ // Verify no records were collected
+ assertThat(collector.list).isEmpty();
+
+ // Verify the error was logged at DEBUG level
+ assertThat(loggerExtension.getMessages())
+ .anyMatch(
+ msg ->
+ msg.contains("Unknown \"op\" value")
+ || msg.contains("Corrupt Debezium
JSON"));
+ }
+
@Test
void testDeserializationWithMetadata() throws Exception {
testDeserializationWithMetadata(
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index d0fbb2d49ce..a25fc6551cb 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -27,9 +27,11 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.File;
import java.io.IOException;
@@ -52,12 +54,17 @@ import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
/**
* Tests for {@link MaxwellJsonSerializationSchema} and {@link
MaxwellJsonDeserializationSchema}.
*/
class MaxwellJsonSerDerTest {
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new
LoggerAuditingExtension(MaxwellJsonDeserializationSchema.class, DEBUG);
+
private static final DataType PHYSICAL_DATA_TYPE =
ROW(
FIELD("id", INT().notNull()),
@@ -130,6 +137,33 @@ class MaxwellJsonSerDerTest {
"An error occurred while collecting data."));
}
+ @Test
+ void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+ String corruptMessage =
"{\"data\":null,\"old\":null,\"type\":\"UNKNOWN_TYPE\"}";
+ MaxwellJsonDeserializationSchema deserializationSchema =
+ new MaxwellJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+ true, // ignoreParseErrors
+ TimestampFormat.ISO_8601);
+ open(deserializationSchema);
+
+ SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(
+ corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+ // Verify no records were collected
+ assertThat(collector.list).isEmpty();
+
+ // Verify the error was logged at DEBUG level
+ assertThat(loggerExtension.getMessages())
+ .anyMatch(
+ msg ->
+ msg.contains("Unknown \"type\" value")
+ || msg.contains("Corrupt Maxwell
JSON"));
+ }
+
@Test
void testSerializationDeserialization() throws Exception {
List<String> lines = readLines("maxwell-data.txt");
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
index 00cdf512ef6..b5a81e40d37 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -27,10 +27,12 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.Collector;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.File;
import java.io.IOException;
@@ -53,10 +55,15 @@ import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.slf4j.event.Level.DEBUG;
/** Tests for {@link OggJsonSerializationSchema} and {@link
OggJsonDeserializationSchema}. */
class OggJsonSerDeSchemaTest {
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new LoggerAuditingExtension(OggJsonDeserializationSchema.class,
DEBUG);
+
private static final DataType PHYSICAL_DATA_TYPE =
ROW(
FIELD("id", INT().notNull()),
@@ -108,6 +115,33 @@ class OggJsonSerDeSchemaTest {
"An error occurred while collecting data."));
}
+ @Test
+ void testIgnoreParseErrorsLogsDebugMessage() throws Exception {
+ String corruptMessage =
"{\"before\":null,\"after\":null,\"op_type\":\"UNKNOWN_OP\"}";
+ OggJsonDeserializationSchema deserializationSchema =
+ new OggJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE,
+ Collections.emptyList(),
+
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()),
+ true, // ignoreParseErrors
+ TimestampFormat.ISO_8601);
+ open(deserializationSchema);
+
+ SimpleCollector collector = new SimpleCollector();
+ deserializationSchema.deserialize(
+ corruptMessage.getBytes(StandardCharsets.UTF_8), collector);
+
+ // Verify no records were collected
+ assertThat(collector.list).isEmpty();
+
+ // Verify the error was logged at DEBUG level
+ assertThat(loggerExtension.getMessages())
+ .anyMatch(
+ msg ->
+ msg.contains("Unknown \"op_type\" value")
+ || msg.contains("Corrupt Ogg JSON"));
+ }
+
@Test
void testTombstoneMessages() throws Exception {
OggJsonDeserializationSchema deserializationSchema =