This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new efe6029 KAFKA-10675: Add schema name to ConnectSchema.validateValue()
error message (#9541)
efe6029 is described below
commit efe6029f9c2b70cebc4359913ff809b7efa31daa
Author: Alexander Iskuskov <[email protected]>
AuthorDate: Sat Jul 10 08:35:02 2021 +0300
KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message
(#9541)
The following error message
`org.apache.kafka.connect.errors.DataException: Invalid Java object for
schema type INT64: class java.lang.Long for field: "moderate_time"`
can be confusing because java.lang.Long is acceptable type for schema INT64.
In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but
this info is not logged.
Reviewers: Randall Hauch <[email protected]>, Chris Egerton
<[email protected]>, Konstantine Karantasis <[email protected]>
---
.../apache/kafka/connect/data/ConnectSchema.java | 23 +++++++++++-----------
.../org/apache/kafka/connect/data/StructTest.java | 21 ++++++++++++++++++--
2 files changed, 31 insertions(+), 13 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index 5e99a0a..6892bfc 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -222,12 +222,6 @@ public class ConnectSchema implements Schema {
}
List<Class<?>> expectedClasses = expectedClassesFor(schema);
-
- if (expectedClasses == null)
- throw new DataException("Invalid Java object for schema type " +
schema.type()
- + ": " + value.getClass()
- + " for field: \"" + name + "\"");
-
boolean foundMatch = false;
for (Class<?> expectedClass : expectedClasses) {
if (expectedClass.isInstance(value)) {
@@ -236,10 +230,17 @@ public class ConnectSchema implements Schema {
}
}
- if (!foundMatch)
- throw new DataException("Invalid Java object for schema type " +
schema.type()
- + ": " + value.getClass()
- + " for field: \"" + name + "\"");
+ if (!foundMatch) {
+ StringBuilder exceptionMessage = new StringBuilder("Invalid Java
object for schema");
+ if (schema.name() != null) {
+ exceptionMessage.append("
\"").append(schema.name()).append("\"");
+ }
+ exceptionMessage.append(" with type
").append(schema.type()).append(": ").append(value.getClass());
+ if (name != null) {
+ exceptionMessage.append(" for field:
\"").append(name).append("\"");
+ }
+ throw new DataException(exceptionMessage.toString());
+ }
switch (schema.type()) {
case STRUCT:
@@ -266,7 +267,7 @@ public class ConnectSchema implements Schema {
private static List<Class<?>> expectedClassesFor(Schema schema) {
List<Class<?>> expectedClasses =
LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
- expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+ expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(),
Collections.emptyList());
return expectedClasses;
}
diff --git
a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
index 79cdfe0..55ccc81 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -311,13 +311,30 @@ public class StructTest {
Exception e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
fakeSchema, new Object()));
- assertEquals("Invalid Java object for schema type null: class
java.lang.Object for field: \"field\"",
+ assertEquals("Invalid Java object for schema \"fake\" with type null:
class java.lang.Object for field: \"field\"",
e.getMessage());
e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
Schema.INT8_SCHEMA, new Object()));
- assertEquals("Invalid Java object for schema type INT8: class
java.lang.Object for field: \"field\"",
+ assertEquals("Invalid Java object for schema with type INT8: class
java.lang.Object for field: \"field\"",
e.getMessage());
+
+ e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
+ assertEquals("Invalid Java object for schema with type INT8: class
java.lang.Object", e.getMessage());
+ }
+
+ @Test
+ public void testValidateFieldWithInvalidValueMismatchTimestamp() {
+ String fieldName = "field";
+ long longValue = 1000L;
+
+ // Does not throw
+ ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
+
+ Exception e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
+ Timestamp.SCHEMA, longValue));
+ assertEquals("Invalid Java object for schema
\"org.apache.kafka.connect.data.Timestamp\" " +
+ "with type INT64: class java.lang.Long for field: \"field\"",
e.getMessage());
}
@Test