mjsax commented on code in PR #20239: URL: https://github.com/apache/kafka/pull/20239#discussion_r2229792843
########## streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java: ########## @@ -56,6 +58,64 @@ */ public class PageViewUntypedDemo { + /** + * Custom JSON serializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeSerializer implements Serializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { Review Comment: This method has a default implementation -- seems we can just omit it here? ########## streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java: ########## @@ -56,6 +58,64 @@ */ public class PageViewUntypedDemo { + /** + * Custom JSON serializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeSerializer implements Serializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // No configuration needed + } + + @Override + public byte[] serialize(String topic, JsonNode data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (IOException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { Review Comment: Same as above ########## streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java: ########## @@ -56,6 +58,64 @@ */ public class PageViewUntypedDemo { + /** + * Custom JSON serializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeSerializer implements Serializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // No configuration needed + } + + @Override + public byte[] serialize(String topic, JsonNode data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (IOException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + // No resources to close + } + } + + /** + * Custom JSON deserializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeDeserializer implements Deserializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // No configuration needed + } + + @Override + public JsonNode deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + try { + return objectMapper.readTree(data); + } catch (IOException e) { + throw new SerializationException("Error deserializing JSON message", e); + } + } + + @Override + public void close() { Review Comment: one more :) ########## streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java: ########## @@ -56,6 +58,64 @@ */ public class PageViewUntypedDemo { + /** + * Custom JSON serializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeSerializer implements Serializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // No configuration needed + } + + @Override + public byte[] serialize(String topic, JsonNode data) { + if (data == null) { + return null; + } + try { + return objectMapper.writeValueAsBytes(data); + } catch (IOException e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + // No resources to close + } + } + + /** + * Custom JSON deserializer for JsonNode objects using Jackson ObjectMapper. + */ + public static class JsonNodeDeserializer implements Deserializer<JsonNode> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { Review Comment: Same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org