RocMarshal commented on code in PR #27370:
URL: https://github.com/apache/flink/pull/27370#discussion_r2651262835
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java:
##########
@@ -155,4 +161,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors,
resultTypeInfo, timestampFormat);
}
+
+ /**
+ * Logs a debug message when a JSON parse error is ignored.
+ *
+ * @param message the original JSON message that failed to parse
+ * @param t the throwable that was caught
+ */
+ protected void logParseError(byte[] message, Throwable t) {
+ LOG.debug("Failed to deserialize JSON '{}'.", new String(message), t);
Review Comment:
```suggestion
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to deserialize JSON '{}'.", new
String(message), t);
}
```
What about using pre-judge for logger level ?
Because we could reduce one redundant bytes array allocation if the logger
level is not enabled debug or lower levels
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -176,13 +182,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"op\" value \"%s\". The Debezium
JSON message is '%s'",
op, new String(message)));
}
+ LOG.debug(
+ "Unknown \"op\" value '{}'. The Debezium JSON message
is '{}'.",
+ op,
+ new String(message));
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Debezium JSON message '%s'.", new
String(message)), t);
}
+ LOG.debug("Corrupt Debezium JSON message '{}'.", new
String(message), t);
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -198,13 +203,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"op_type\" value \"%s\". The Ogg
JSON message is '%s'",
op, new String(message)));
}
+ LOG.debug(
+ "Unknown \"op_type\" value '{}'. The Ogg JSON message
is '{}'.",
+ op,
+ new String(message));
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Ogg JSON message '%s'.", new
String(message)), t);
}
+ LOG.debug("Corrupt Ogg JSON message '{}'.", new String(message),
t);
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java:
##########
@@ -155,4 +161,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors,
resultTypeInfo, timestampFormat);
}
+
+ /**
+ * Logs a debug message when a JSON parse error is ignored.
+ *
+ * @param message the original JSON message that failed to parse
+ * @param t the throwable that was caught
+ */
+ protected void logParseError(byte[] message, Throwable t) {
Review Comment:
what about naming to `logIgnoredParseErrorIfDebugEnabled` ?
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java:
##########
@@ -171,13 +177,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"type\" value \"%s\". The
Maxwell JSON message is '%s'",
type, new String(message)));
}
+ LOG.debug(
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -288,13 +293,18 @@ public void deserialize(@Nullable byte[] message,
Collector<RowData> out) throws
"Unknown \"type\" value \"%s\". The Canal
JSON message is '%s'",
type, new String(message)));
}
+ LOG.debug(
+ "Unknown \"type\" value '{}'. The Canal JSON message
is '{}'.",
+ type,
+ new String(message));
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Canal JSON message '%s'.", new
String(message)), t);
}
+ LOG.debug("Corrupt Canal JSON message '{}'.", new String(message),
t);
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java:
##########
@@ -176,13 +182,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"op\" value \"%s\". The Debezium
JSON message is '%s'",
op, new String(message)));
}
+ LOG.debug(
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonDeserializationSchema.java:
##########
@@ -198,13 +203,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"op_type\" value \"%s\". The Ogg
JSON message is '%s'",
op, new String(message)));
}
+ LOG.debug(
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java:
##########
@@ -171,13 +177,18 @@ public void deserialize(byte[] message,
Collector<RowData> out) throws IOExcepti
"Unknown \"type\" value \"%s\". The
Maxwell JSON message is '%s'",
type, new String(message)));
}
+ LOG.debug(
+ "Unknown \"type\" value '{}'. The Maxwell JSON message
is '{}'.",
+ type,
+ new String(message));
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Maxwell JSON message '%s'.", new
String(message)), t);
}
+ LOG.debug("Corrupt Maxwell JSON message '{}'.", new
String(message), t);
Review Comment:
ditto
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -288,13 +293,18 @@ public void deserialize(@Nullable byte[] message,
Collector<RowData> out) throws
"Unknown \"type\" value \"%s\". The Canal
JSON message is '%s'",
type, new String(message)));
}
+ LOG.debug(
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]