This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 26c4d6d KAFKA-10675: Add schema name to ConnectSchema.validateValue()
error message (#9541)
26c4d6d is described below
commit 26c4d6d78a44348b0ee828368cd71afe7205b2dc
Author: Alexander Iskuskov <[email protected]>
AuthorDate: Sat Jul 10 08:35:02 2021 +0300
KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message
(#9541)
The following error message
`org.apache.kafka.connect.errors.DataException: Invalid Java object for
schema type INT64: class java.lang.Long for field: "moderate_time"`
can be confusing because java.lang.Long is acceptable type for schema INT64.
In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but
this info is not logged.
Reviewers: Randall Hauch <[email protected]>, Chris Egerton
<[email protected]>, Konstantine Karantasis <[email protected]>
---
.../apache/kafka/connect/data/ConnectSchema.java | 23 +++++++++++-----------
.../org/apache/kafka/connect/data/StructTest.java | 21 ++++++++++++++++++--
2 files changed, 31 insertions(+), 13 deletions(-)
diff --git
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index a465b12..0c18b0a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -222,12 +222,6 @@ public class ConnectSchema implements Schema {
}
List<Class> expectedClasses = expectedClassesFor(schema);
-
- if (expectedClasses == null)
- throw new DataException("Invalid Java object for schema type " +
schema.type()
- + ": " + value.getClass()
- + " for field: \"" + name + "\"");
-
boolean foundMatch = false;
if (expectedClasses.size() == 1) {
foundMatch = expectedClasses.get(0).isInstance(value);
@@ -240,10 +234,17 @@ public class ConnectSchema implements Schema {
}
}
- if (!foundMatch)
- throw new DataException("Invalid Java object for schema type " +
schema.type()
- + ": " + value.getClass()
- + " for field: \"" + name + "\"");
+ if (!foundMatch) {
+ StringBuilder exceptionMessage = new StringBuilder("Invalid Java
object for schema");
+ if (schema.name() != null) {
+ exceptionMessage.append("
\"").append(schema.name()).append("\"");
+ }
+ exceptionMessage.append(" with type
").append(schema.type()).append(": ").append(value.getClass());
+ if (name != null) {
+ exceptionMessage.append(" for field:
\"").append(name).append("\"");
+ }
+ throw new DataException(exceptionMessage.toString());
+ }
switch (schema.type()) {
case STRUCT:
@@ -270,7 +271,7 @@ public class ConnectSchema implements Schema {
private static List<Class> expectedClassesFor(Schema schema) {
List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
- expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+ expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(),
Collections.emptyList());
return expectedClasses;
}
diff --git
a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
index 415b295..65cbfd8 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -305,13 +305,30 @@ public class StructTest {
Exception e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
fakeSchema, new Object()));
- assertEquals("Invalid Java object for schema type null: class
java.lang.Object for field: \"field\"",
+ assertEquals("Invalid Java object for schema \"fake\" with type null:
class java.lang.Object for field: \"field\"",
e.getMessage());
e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
Schema.INT8_SCHEMA, new Object()));
- assertEquals("Invalid Java object for schema type INT8: class
java.lang.Object for field: \"field\"",
+ assertEquals("Invalid Java object for schema with type INT8: class
java.lang.Object for field: \"field\"",
e.getMessage());
+
+ e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
+ assertEquals("Invalid Java object for schema with type INT8: class
java.lang.Object", e.getMessage());
+ }
+
+ @Test
+ public void testValidateFieldWithInvalidValueMismatchTimestamp() {
+ String fieldName = "field";
+ long longValue = 1000L;
+
+ // Does not throw
+ ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
+
+ Exception e = assertThrows(DataException.class, () ->
ConnectSchema.validateValue(fieldName,
+ Timestamp.SCHEMA, longValue));
+ assertEquals("Invalid Java object for schema
\"org.apache.kafka.connect.data.Timestamp\" " +
+ "with type INT64: class java.lang.Long for field: \"field\"",
e.getMessage());
}
@Test