This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb9f5189f5b KAFKA-16913: Support external schemas in JSONConverter 
(#19449)
eb9f5189f5b is described below

commit eb9f5189f5bf40b9f55a2550078b6a32024712cc
Author: Priyanka K U <[email protected]>
AuthorDate: Tue Aug 5 13:30:14 2025 +0530

    KAFKA-16913: Support external schemas in JSONConverter (#19449)
    
    When using a connector that requires a schema, such as JDBC connectors,
    with JSON messages, the current JSONConverter necessitates including the
    schema within every message. To address this, we are introducing a new
    parameter, schema.content, which allows you to provide the schema
    externally. This approach not only reduces the size of the messages but
    also facilitates the use of more complex schemas.
    
    KIP :
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter
    
    Reviewers: Mickael Maison <[email protected]>, TengYao Chi 
<[email protected]>, Edoardo Comar <[email protected]>
---
 .../apache/kafka/connect/json/JsonConverter.java   | 26 ++++++++---
 .../kafka/connect/json/JsonConverterConfig.java    | 24 ++++++++++
 .../kafka/connect/json/JsonConverterTest.java      | 54 ++++++++++++++++++++++
 3 files changed, 98 insertions(+), 6 deletions(-)

diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 8b3d60133a9..7fa5358f1c3 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -224,6 +224,7 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
     private JsonConverterConfig config;
     private Cache<Schema, ObjectNode> fromConnectSchemaCache;
     private Cache<JsonNode, Schema> toConnectSchemaCache;
+    private Schema schema = null;     // if a schema is provided in config, 
this schema will be used for all messages
 
     private final JsonSerializer serializer;
     private final JsonDeserializer deserializer;
@@ -286,6 +287,16 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
 
         fromConnectSchemaCache = new SynchronizedCache<>(new 
LRUCache<>(config.schemaCacheSize()));
         toConnectSchemaCache = new SynchronizedCache<>(new 
LRUCache<>(config.schemaCacheSize()));
+
+        try {
+            final byte[] schemaContent = config.schemaContent();
+            if (schemaContent != null) {
+                final JsonNode schemaNode = deserializer.deserialize("", 
schemaContent);
+                this.schema = asConnectSchema(schemaNode);
+            }
+        } catch (SerializationException e) {
+            throw new DataException("Failed to parse schema in converter 
config due to serialization error: ", e);
+        }
     }
 
     @Override
@@ -340,13 +351,16 @@ public class JsonConverter implements Converter, 
HeaderConverter, Versioned {
             throw new DataException("Converting byte[] to Kafka Connect data 
failed due to serialization error: ", e);
         }
 
-        if (config.schemasEnabled() && (!jsonValue.isObject() || 
jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) 
|| !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
-            throw new DataException("JsonConverter with schemas.enable 
requires \"schema\" and \"payload\" fields and may not contain additional 
fields." +
+        if (config.schemasEnabled()) {
+            if (schema != null) {
+                return new SchemaAndValue(schema, convertToConnect(schema, 
jsonValue, config));
+            } else if (!jsonValue.isObject() || jsonValue.size() != 2 || 
!jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || 
!jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
+                throw new DataException("JsonConverter with schemas.enable 
requires \"schema\" and \"payload\" fields and may not contain additional 
fields." +
                     " If you are trying to deserialize plain JSON data, set 
schemas.enable=false in your converter configuration.");
-
-        // The deserialized data should either be an envelope object 
containing the schema and the payload or the schema
-        // was stripped during serialization and we need to fill in an 
all-encompassing schema.
-        if (!config.schemasEnabled()) {
+            }
+        } else {
+            // The deserialized data should either be an envelope object 
containing the schema and the payload or the schema
+            // was stripped during serialization and we need to fill in an 
all-encompassing schema.
             ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
             envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
             envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
diff --git 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
index f02d54ac263..4d148250114 100644
--- 
a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
+++ 
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.connect.storage.ConverterConfig;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Locale;
 import java.util.Map;
 
@@ -35,6 +36,11 @@ public final class JsonConverterConfig extends 
ConverterConfig {
     private static final String SCHEMAS_ENABLE_DOC = "Include schemas within 
each of the serialized values and keys.";
     private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";
 
+    public static final String SCHEMA_CONTENT_CONFIG = "schema.content";
+    public static final String SCHEMA_CONTENT_DEFAULT = null;
+    private static final String SCHEMA_CONTENT_DOC = "When set, this is used 
as the schema for all messages. Otherwise, the schema will be included in the 
content of each message.";
+    private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content";
+
     public static final String SCHEMAS_CACHE_SIZE_CONFIG = 
"schemas.cache.size";
     public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
     private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number 
of schemas that can be cached in this converter instance.";
@@ -61,6 +67,8 @@ public final class JsonConverterConfig extends 
ConverterConfig {
                       orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
         CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, 
SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
                       orderInGroup++, Width.MEDIUM, 
SCHEMAS_CACHE_SIZE_DISPLAY);
+        CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, 
SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group,
+                      orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY);
 
         group = "Serialization";
         orderInGroup = 0;
@@ -86,6 +94,7 @@ public final class JsonConverterConfig extends 
ConverterConfig {
     private final int schemaCacheSize;
     private final DecimalFormat decimalFormat;
     private final boolean replaceNullWithDefault;
+    private final byte[] schemaContent;
 
     public JsonConverterConfig(Map<String, ?> props) {
         super(CONFIG, props);
@@ -93,6 +102,10 @@ public final class JsonConverterConfig extends 
ConverterConfig {
         this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG);
         this.decimalFormat = 
DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT));
         this.replaceNullWithDefault = 
getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
+        String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG);
+        this.schemaContent = (schemaContentStr == null || 
schemaContentStr.isEmpty())
+                ? null
+                : schemaContentStr.getBytes(StandardCharsets.UTF_8);
     }
 
     /**
@@ -130,4 +143,15 @@ public final class JsonConverterConfig extends 
ConverterConfig {
         return replaceNullWithDefault;
     }
 
+    /**
+     * If a default schema is provided in the converter config, this will be
+     * used for all messages.
+     * 
+     * This is only relevant if schemas are enabled.
+     *
+     * @return Schema Contents, will return null if no value is provided
+     */
+    public byte[] schemaContent() {
+        return schemaContent;
+    }
 }
diff --git 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 34010ddac05..200b33d1774 100644
--- 
a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ 
b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -36,6 +36,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -977,6 +979,58 @@ public class JsonConverterTest {
         assertEquals(AppInfoParser.getVersion(), converter.version());
     }
 
+    @Test
+    public void testSchemaContentIsNull() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null);
+        converter.configure(config, false);
+        byte[] jsonBytes = "{ \"schema\": { \"type\": \"string\" }, 
\"payload\": \"foo-bar-baz\" }".getBytes();
+        SchemaAndValue result = converter.toConnectData(TOPIC, jsonBytes);
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), 
result);
+    }
+
+    @Test
+    public void testSchemaContentIsEmptyString() {
+        converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, 
""), false);
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), 
converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, 
\"payload\": \"foo-bar-baz\" }".getBytes()));
+    }
+
+    @Test
+    public void testSchemaContentValidSchema() {
+        converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, 
"{ \"type\": \"string\" }"), false);
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), 
converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes()));
+    }
+
+    @Test
+    public void testSchemaContentInValidSchema() {
+        assertThrows(
+            DataException.class,
+            () -> 
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ 
\"string\" }"), false),
+            " Provided schema is invalid , please recheck the schema you have 
provided");
+    }
+
+    @Test
+    public void testSchemaContentLooksLikeSchema() {
+        converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, 
"{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": 
\"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, 
{\"field\": \"payload\", \"type\": \"string\"}]}"), false);
+        SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ 
\"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" 
}".getBytes());
+        assertEquals("foo-bar-baz", ((Struct) 
connectData.value()).getString("payload"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+        "{ }",
+        "{ \"wrong\": \"schema\" }",
+        "{ \"schema\": { \"type\": \"string\" } }",
+        "{ \"payload\": \"foo-bar-baz\" }",
+        "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", 
\"extra\": \"field\" }",
+    })
+    public void testNullSchemaContentWithWrongConnectDataValue(String value) {
+        converter.configure(Map.of(), false);
+        assertThrows(
+                DataException.class,
+                () -> converter.toConnectData(TOPIC, value.getBytes()));
+    }
+
     private JsonNode parse(byte[] json) {
         try {
             return objectMapper.readTree(json);

Reply via email to