[ 
https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424197#comment-16424197
 ] 

ASF GitHub Bot commented on KAFKA-6728:
---------------------------------------

ewencp closed pull request #4815: KAFKA-6728: Corrected the worker’s 
instantiation of the HeaderConverter
URL: https://github.com/apache/kafka/pull/4815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0a895f67cf8..fd05af57a64 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -76,7 +76,9 @@
     public static final String HEADER_CONVERTER_CLASS_CONFIG = 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER_CLASS_DOC = 
WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
     public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header 
converter class";
-    public static final String HEADER_CONVERTER_CLASS_DEFAULT = 
WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
+    // The Connector config should not have a default for the header 
converter, since the absence of a config property means that
+    // the worker config settings should be used. Thus, we set the default to 
null here.
+    public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to 
use for this connector.";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e3d9cf45901..1c6465855ff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -397,12 +397,21 @@ public boolean startTask(
             );
             if (keyConverter == null) {
                 keyConverter = plugins.newConverter(config, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                log.info("Set up the key converter {} for task {} using the 
worker config", keyConverter.getClass(), id);
+            } else {
+                log.info("Set up the key converter {} for task {} using the 
connector config", keyConverter.getClass(), id);
             }
             if (valueConverter == null) {
                 valueConverter = plugins.newConverter(config, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                log.info("Set up the value converter {} for task {} using the 
worker config", valueConverter.getClass(), id);
+            } else {
+                log.info("Set up the value converter {} for task {} using the 
connector config", valueConverter.getClass(), id);
             }
             if (headerConverter == null) {
                 headerConverter = plugins.newHeaderConverter(config, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                log.info("Set up the header converter {} for task {} using the 
worker config", headerConverter.getClass(), id);
+            } else {
+                log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), id);
             }
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 94f27717080..f4cd2ba14b0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, String 
classPropertyName, C
         // Configure the Converter using only the old configuration mechanism 
...
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        log.debug("Configuring the {} converter with configuration:{}{}",
+                  isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig);
         plugin.configure(converterConfig, isKeyConverter);
         return plugin;
     }
@@ -249,20 +251,21 @@ public Converter newConverter(AbstractConfig config, 
String classPropertyName, C
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName)) {
-            // This configuration does not define the header converter via the 
specified property name
-            return null;
-        }
         HeaderConverter plugin = null;
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
+                if (!config.originals().containsKey(classPropertyName)) {
+                    // This connector configuration does not define the header 
converter via the specified property name
+                    return null;
+                }
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
                 // before calling config(...)
                 plugin = getInstance(config, classPropertyName, 
HeaderConverter.class);
                 break;
             case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
+                // Note that there will always be at least a default header 
converter for the worker
                 String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
                 Class<? extends HeaderConverter> klass;
                 try {
@@ -288,6 +291,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig 
config, String classPro
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration:{}{}", 
System.lineSeparator(), converterConfig);
         plugin.configure(converterConfig);
         return plugin;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 6de92eedd34..a9a944fa360 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,18 +40,31 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class PluginsTest {
 
-    private static Map<String, String> props;
+    private static Map<String, String> pluginProps;
     private static Plugins plugins;
+    private Map<String, String> props;
     private AbstractConfig config;
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
 
     @BeforeClass
     public static void beforeAll() {
-        props = new HashMap<>();
+        pluginProps = new HashMap<>();
+
+        // Set up the plugins to have no additional plugin directories.
+        // This won't allow us to test classpath isolation, but it will allow 
us to test some of the utility methods.
+        pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
+        plugins = new Plugins(pluginProps);
+    }
+
+    @Before
+    public void setup() {
+        props = new HashMap<>(pluginProps);
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
         props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConverter.class.getName());
         props.put("key.converter." + 
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
@@ -66,14 +80,10 @@ public static void beforeAll() {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestHeaderConverter.class.getName());
         props.put("header.converter.extra.config", "baz");
 
-        // Set up the plugins to have no additional plugin directories.
-        // This won't allow us to test classpath isolation, but it will allow 
us to test some of the utility methods.
-        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
-        plugins = new Plugins(props);
+        createConfig();
     }
 
-    @Before
-    public void setup() {
+    protected void createConfig() {
         this.config = new TestableWorkerConfig(props);
     }
 
@@ -104,11 +114,48 @@ public void 
shouldInstantiateAndConfigureInternalConverters() {
     }
 
     @Test
-    public void shouldInstantiateAndConfigureHeaderConverter() {
-        
instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+    public void 
shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader()
 {
+        assertNotNull(props.get(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG));
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     
ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
+        // Validate extra configs got passed through to overridden converters
+        assertConverterType(ConverterType.HEADER, 
this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
         // Validate extra configs got passed through to overridden converters
-        assertConverterType(ConverterType.HEADER, headerConverter.configs);
-        assertEquals("baz", headerConverter.configs.get("extra.config"));
+        assertConverterType(ConverterType.HEADER, 
this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
+        props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+        createConfig();
+
+        // Because it's not explicitly set on the supplied configuration, the 
logic to use the current classloader for the connector
+        // will exit immediately, and so this method always returns null
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     
ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNull(headerConverter);
+        // But we should always find it (or the worker's default) when using 
the plugins classloader ...
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof SimpleHeaderConverter);
     }
 
     protected void instantiateAndConfigureConverter(String configPropName, 
ClassLoaderUsage classLoaderUsage) {
@@ -116,11 +163,6 @@ protected void instantiateAndConfigureConverter(String 
configPropName, ClassLoad
         assertNotNull(converter);
     }
 
-    protected void instantiateAndConfigureHeaderConverter(String 
configPropName) {
-        headerConverter = (TestHeaderConverter) 
plugins.newHeaderConverter(config, configPropName, 
ClassLoaderUsage.CURRENT_CLASSLOADER);
-        assertNotNull(headerConverter);
-    }
-
     protected void assertConverterType(ConverterType type, Map<String, ?> 
props) {
         assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect Header Null Pointer Exception
> -------------------------------------------
>
>                 Key: KAFKA-6728
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6728
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>         Environment: Linux Mint
>            Reporter: Philippe Hong
>            Assignee: Randall Hauch
>            Priority: Critical
>             Fix For: 1.2.0, 1.1.1
>
>
> I am trying to use the newly released Kafka Connect that supports headers by 
> using the standalone connector to write to a text file (so in this case I am 
> only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and 
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a 
> ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, 
> Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a 
> KafkaConsumer (where in this case I can see the content of the header of the 
> record I sent previously) so no problem here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for 
> the value. 
>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating 
> an issue on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to