[ 
https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaik Zakir Hussain updated KAFKA-10477:
----------------------------------------
    Affects Version/s: 2.4.0

> Sink Connector fails with DataException when trying to convert Kafka record 
> with empty key to Connect Record
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10477
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.4.0
>            Reporter: Shaik Zakir Hussain
>            Priority: Major
>
> Sink connector is facing a DataException when trying to convert a kafka 
> record with empty key to Connect data format. 
> Kafka's trunk branch currently depends on *jackson v2.10.5* 
> A short unit test (shared below) in 
> `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. 
>  
> {code:java}
> @Test
>     public void testToConnectDataEmptyKey() throws IOException {
>         Map<String, Boolean> props = 
> Collections.singletonMap("schemas.enable", false);
>         converter.configure(props, true);
>         String str = "";
>         SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
> str.getBytes());
>         System.out.println(schemaAndValue);
>     }
> {code}
> This test code snippet fails with the following exception:
> {noformat}
> org.apache.kafka.connect.errors.DataException: Unknown schema type: null
>       at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
>       at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
>       at 
> org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {noformat}
>  
> This seems related to the issue 
> [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson 
> lib started returning `MissingNode` for empty input in 
> `ObjectMapper.readTree(input)` method invocation. Precise code change can be 
> observed here: 
> [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
>  
>  
> This causes an exception to throw up in our JsonConverter class : 
> [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
>  
>  
> In my opinion, when the `jsonValue.getNodeType()` is `MISSING` 
> ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754]
>  ), we need to fall back to the behaviour of the case `NULL` (i.e. return 
> null), although not sure of any further repercussions this might bring in.
>  
> Things were working fine when the dependency on *jackson* lib was of version  
> *v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case.
>  
> Thanks,
> Zakir



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to