This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 26c4d6d  KAFKA-10675: Add schema name to ConnectSchema.validateValue() 
error message (#9541)
26c4d6d is described below

commit 26c4d6d78a44348b0ee828368cd71afe7205b2dc
Author: Alexander Iskuskov <[email protected]>
AuthorDate: Sat Jul 10 08:35:02 2021 +0300

    KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message 
(#9541)
    
    The following error message
    `org.apache.kafka.connect.errors.DataException: Invalid Java object for 
schema type INT64: class java.lang.Long for field: "moderate_time"`
    can be confusing because java.lang.Long is acceptable type for schema INT64.
    
    In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but 
this info is not logged.
    
    Reviewers: Randall Hauch <[email protected]>, Chris Egerton 
<[email protected]>, Konstantine Karantasis <[email protected]>
---
 .../apache/kafka/connect/data/ConnectSchema.java   | 23 +++++++++++-----------
 .../org/apache/kafka/connect/data/StructTest.java  | 21 ++++++++++++++++++--
 2 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index a465b12..0c18b0a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -222,12 +222,6 @@ public class ConnectSchema implements Schema {
         }
 
         List<Class> expectedClasses = expectedClassesFor(schema);
-
-        if (expectedClasses == null)
-            throw new DataException("Invalid Java object for schema type " + 
schema.type()
-                    + ": " + value.getClass()
-                    + " for field: \"" + name + "\"");
-
         boolean foundMatch = false;
         if (expectedClasses.size() == 1) {
             foundMatch = expectedClasses.get(0).isInstance(value);
@@ -240,10 +234,17 @@ public class ConnectSchema implements Schema {
             }
         }
 
-        if (!foundMatch)
-            throw new DataException("Invalid Java object for schema type " + 
schema.type()
-                    + ": " + value.getClass()
-                    + " for field: \"" + name + "\"");
+        if (!foundMatch) {
+            StringBuilder exceptionMessage = new StringBuilder("Invalid Java 
object for schema");
+            if (schema.name() != null) {
+                exceptionMessage.append(" 
\"").append(schema.name()).append("\"");
+            }
+            exceptionMessage.append(" with type 
").append(schema.type()).append(": ").append(value.getClass());
+            if (name != null) {
+                exceptionMessage.append(" for field: 
\"").append(name).append("\"");
+            }
+            throw new DataException(exceptionMessage.toString());
+        }
 
         switch (schema.type()) {
             case STRUCT:
@@ -270,7 +271,7 @@ public class ConnectSchema implements Schema {
     private static List<Class> expectedClassesFor(Schema schema) {
         List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
         if (expectedClasses == null)
-            expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+            expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), 
Collections.emptyList());
         return expectedClasses;
     }
 
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
index 415b295..65cbfd8 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -305,13 +305,30 @@ public class StructTest {
 
         Exception e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(fieldName,
             fakeSchema, new Object()));
-        assertEquals("Invalid Java object for schema type null: class 
java.lang.Object for field: \"field\"",
+        assertEquals("Invalid Java object for schema \"fake\" with type null: 
class java.lang.Object for field: \"field\"",
             e.getMessage());
 
         e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(fieldName,
             Schema.INT8_SCHEMA, new Object()));
-        assertEquals("Invalid Java object for schema type INT8: class 
java.lang.Object for field: \"field\"",
+        assertEquals("Invalid Java object for schema with type INT8: class 
java.lang.Object for field: \"field\"",
             e.getMessage());
+
+        e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
+        assertEquals("Invalid Java object for schema with type INT8: class 
java.lang.Object", e.getMessage());
+    }
+
+    @Test
+    public void testValidateFieldWithInvalidValueMismatchTimestamp() {
+        String fieldName = "field";
+        long longValue = 1000L;
+
+        // Does not throw
+        ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
+
+        Exception e = assertThrows(DataException.class, () -> 
ConnectSchema.validateValue(fieldName,
+            Timestamp.SCHEMA, longValue));
+        assertEquals("Invalid Java object for schema 
\"org.apache.kafka.connect.data.Timestamp\" " +
+                "with type INT64: class java.lang.Long for field: \"field\"", 
e.getMessage());
     }
 
     @Test

Reply via email to