[ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424197#comment-16424197 ]
ASF GitHub Bot commented on KAFKA-6728: --------------------------------------- ewencp closed pull request #4815: KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter URL: https://github.com/apache/kafka/pull/4815 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 0a895f67cf8..fd05af57a64 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -76,7 +76,9 @@ public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG; public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC; public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class"; - public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT; + // The Connector config should not have a default for the header converter, since the absence of a config property means that + // the worker config settings should be used. Thus, we set the default to null here. + public static final String HEADER_CONVERTER_CLASS_DEFAULT = null; public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e3d9cf45901..1c6465855ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -397,12 +397,21 @@ public boolean startTask( ); if (keyConverter == null) { keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); + } else { + log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); } if (valueConverter == null) { valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); + } else { + log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); } if (headerConverter == null) { headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); + } else { + log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 94f27717080..f4cd2ba14b0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, String classPropertyName, C // Configure the Converter using only the old configuration mechanism ... String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); + log.debug("Configuring the {} converter with configuration:{}{}", + isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig); plugin.configure(converterConfig, isKeyConverter); return plugin; } @@ -249,20 +251,21 @@ public Converter newConverter(AbstractConfig config, String classPropertyName, C * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName)) { - // This configuration does not define the header converter via the specified property name - return null; - } HeaderConverter plugin = null; switch (classLoaderUsage) { case CURRENT_CLASSLOADER: + if (!config.originals().containsKey(classPropertyName)) { + // This connector configuration does not define the header converter via the specified property name + return null; + } // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes // before calling config(...) plugin = getInstance(config, classPropertyName, HeaderConverter.class); break; case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback + // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. + // Note that there will always be at least a default header converter for the worker String converterClassOrAlias = config.getClass(classPropertyName).getName(); Class<? extends HeaderConverter> klass; try { @@ -288,6 +291,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPro String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); + log.debug("Configuring the header converter with configuration:{}{}", System.lineSeparator(), converterConfig); plugin.configure(converterConfig); return plugin; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 6de92eedd34..a9a944fa360 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -39,18 +40,31 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class PluginsTest { - private static Map<String, String> props; + private static Map<String, String> pluginProps; private static Plugins plugins; + private Map<String, String> props; private AbstractConfig config; private TestConverter converter; private TestHeaderConverter headerConverter; @BeforeClass public static void beforeAll() { - props = new HashMap<>(); + pluginProps = new HashMap<>(); + + // Set up the plugins to have no additional plugin directories. + // This won't allow us to test classpath isolation, but it will allow us to test some of the utility methods. + pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, ""); + plugins = new Plugins(pluginProps); + } + + @Before + public void setup() { + props = new HashMap<>(pluginProps); props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); props.put("key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true"); @@ -66,14 +80,10 @@ public static void beforeAll() { props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); props.put("header.converter.extra.config", "baz"); - // Set up the plugins to have no additional plugin directories. - // This won't allow us to test classpath isolation, but it will allow us to test some of the utility methods. - props.put(WorkerConfig.PLUGIN_PATH_CONFIG, ""); - plugins = new Plugins(props); + createConfig(); } - @Before - public void setup() { + protected void createConfig() { this.config = new TestableWorkerConfig(props); } @@ -104,11 +114,48 @@ public void shouldInstantiateAndConfigureInternalConverters() { } @Test - public void shouldInstantiateAndConfigureHeaderConverter() { - instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); + public void shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader() { + assertNotNull(props.get(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG)); + HeaderConverter headerConverter = plugins.newHeaderConverter(config, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.CURRENT_CLASSLOADER); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof TestHeaderConverter); + this.headerConverter = (TestHeaderConverter) headerConverter; + + // Validate extra configs got passed through to overridden converters + assertConverterType(ConverterType.HEADER, this.headerConverter.configs); + assertEquals("baz", this.headerConverter.configs.get("extra.config")); + + headerConverter = plugins.newHeaderConverter(config, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof TestHeaderConverter); + this.headerConverter = (TestHeaderConverter) headerConverter; + // Validate extra configs got passed through to overridden converters - assertConverterType(ConverterType.HEADER, headerConverter.configs); - assertEquals("baz", headerConverter.configs.get("extra.config")); + assertConverterType(ConverterType.HEADER, this.headerConverter.configs); + assertEquals("baz", this.headerConverter.configs.get("extra.config")); + } + + @Test + public void shouldInstantiateAndConfigureDefaultHeaderConverter() { + props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); + createConfig(); + + // Because it's not explicitly set on the supplied configuration, the logic to use the current classloader for the connector + // will exit immediately, and so this method always returns null + HeaderConverter headerConverter = plugins.newHeaderConverter(config, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.CURRENT_CLASSLOADER); + assertNull(headerConverter); + // But we should always find it (or the worker's default) when using the plugins classloader ... + headerConverter = plugins.newHeaderConverter(config, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUGINS); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof SimpleHeaderConverter); } protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage classLoaderUsage) { @@ -116,11 +163,6 @@ protected void instantiateAndConfigureConverter(String configPropName, ClassLoad assertNotNull(converter); } - protected void instantiateAndConfigureHeaderConverter(String configPropName) { - headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER); - assertNotNull(headerConverter); - } - protected void assertConverterType(ConverterType type, Map<String, ?> props) { assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG)); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect Header Null Pointer Exception > ------------------------------------------- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.1.0 > Environment: Linux Mint > Reporter: Philippe Hong > Assignee: Randall Hauch > Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers by > using the standalone connector to write to a text file (so in this case I am > only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a > ProducerRecord[String, Array[Byte]] using a KafkaProducer[String, > Array[Byte]] with a header that has a key and value. > I can read the record with a console consumer as well as using a > KafkaConsumer (where in this case I can see the content of the header of the > record I sent previously) so no problem here. > I only made two changes to the kafka configuration: > - I used the StringConverter for the key and the ByteArrayConverter for > the value. > - I also changed the topic where the sink would connect to. > If I forgot something please tell me so as it is the first time I am creating > an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)