This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 46de22e  KAFKA-10387: Fix inclusion of transformation configs when 
topic creation is enabled in Connect (#9172)
46de22e is described below

commit 46de22e8a130e9a424a103092d8e4efbd83337e0
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Sun Aug 16 21:12:50 2020 -0700

    KAFKA-10387: Fix inclusion of transformation configs when topic creation is 
enabled in Connect (#9172)
    
    Addition of configs for custom topic creation with KIP-158 created a 
regression when transformation configs are also included in the configuration 
of a source connector.
    
    To experience the issue, just enabling topic creation at the worker is not 
sufficient. A user needs to supply a source connector configuration that 
contains both transformations and custom topic creation properties.
    
    The issue is that the enrichment of configs in `SourceConnectorConfig` 
happens on top of an `AbstractConfig` rather than a `ConnectorConfig`. 
Inheriting from the latter allows enrichment to be composable for both topic 
creation and transformations.
    
    Unit tests and integration tests are written to test these combinations.
    
    Reviewers: Randall Hauch <[email protected]>
---
 .../connect/runtime/SourceConnectorConfig.java     |  11 +-
 .../integration/TransformationIntegrationTest.java |  40 +++---
 .../kafka/connect/util/TopicCreationTest.java      | 150 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 22 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 5e175a5..fc5832a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -47,9 +47,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
             + "created by source connectors";
     private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
 
-    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
-        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
-            super(configDef, props);
+    private static class EnrichedSourceConnectorConfig extends ConnectorConfig 
{
+        EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef, 
Map<String, String> props) {
+            super(plugins, configDef, props);
         }
 
         @Override
@@ -129,8 +129,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
             propsWithoutRegexForDefaultGroup.entrySet()
                     .removeIf(e -> 
e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + INCLUDE_REGEX_CONFIG)
                             || e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX 
+ EXCLUDE_REGEX_CONFIG));
-            enrichedSourceConfig = new 
EnrichedSourceConnectorConfig(enrich(defaultConfigDef, props,
-                    defaultGroup), propsWithoutRegexForDefaultGroup);
+            enrichedSourceConfig = new EnrichedSourceConnectorConfig(plugins,
+                    enrich(defaultConfigDef, props, defaultGroup),
+                    propsWithoutRegexForDefaultGroup);
         } else {
             enrichedSourceConfig = null;
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index ca9d55d..379172a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -16,11 +16,6 @@
  */
 package org.apache.kafka.connect.integration;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.transforms.Filter;
@@ -36,6 +31,11 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
 import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -44,6 +44,9 @@ import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
@@ -73,20 +76,21 @@ public class TransformationIntegrationTest {
     @Before
     public void setup() {
         // setup Connect worker properties
-        Map<String, String> exampleWorkerProps = new HashMap<>();
-        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
 
         // setup Kafka broker properties
-        Properties exampleBrokerProps = new Properties();
-        exampleBrokerProps.put("auto.create.topics.enable", "false");
+        Properties brokerProps = new Properties();
+        // This is required because tests in this class also test 
per-connector topic creation with transformations
+        brokerProps.put("auto.create.topics.enable", "false");
 
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
                 .numBrokers(1)
-                .workerProps(exampleWorkerProps)
-                .brokerProps(exampleBrokerProps)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
                 .build();
 
         // start the clusters
@@ -262,16 +266,15 @@ public class TransformationIntegrationTest {
     }
 
     /**
-     * Test the {@link Filter} transformer with a
-     * {@link HasHeaderKey} predicate on a source connector.
+     * Test the {@link Filter} transformer with a {@link HasHeaderKey} 
predicate on a source connector.
+     * Note that this test uses topic creation configs to allow the source 
connector to create
+     * the topic when it tries to produce the first source record, instead of 
requiring the topic
+     * to exist before the connector starts.
      */
     @Test
-    public void testFilterOnHasHeaderKeyWithSourceConnector() throws Exception 
{
+    public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() 
throws Exception {
         assertConnectReady();
 
-        // create test topic
-        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
-
         // setup up props for the sink connector
         Map<String, String> props = new HashMap<>();
         props.put("name", CONNECTOR_NAME);
@@ -288,6 +291,9 @@ public class TransformationIntegrationTest {
         props.put(PREDICATES_CONFIG, "headerPredicate");
         props.put(PREDICATES_CONFIG + ".headerPredicate.type", 
HasHeaderKey.class.getSimpleName());
         props.put(PREDICATES_CONFIG + ".headerPredicate.name", "header-8");
+        // custom topic creation is used, so there's no need to proactively 
create the test topic
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, 
String.valueOf(-1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, 
String.valueOf(NUM_TOPIC_PARTITIONS));
 
         // expect all records to be produced by the connector
         connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index 3a51d10..feb0e5f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -18,15 +18,21 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.RegexRouter;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -485,4 +491,148 @@ public class TopicCreationTest {
         assertEquals(barPartitions, barTopicSpec.numPartitions());
         assertThat(barTopicSpec.configs(), is(barTopicProps));
     }
+
+    @Test
+    public void testTopicCreationWithSingleTransformation() {
+        sourceProps = defaultConnectorPropsWithTopicCreation();
+        sourceProps.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", 
FOO_GROUP, BAR_GROUP));
+        String xformName = "example";
+        String castType = "int8";
+        sourceProps.put("transforms", xformName);
+        sourceProps.put("transforms." + xformName + ".type", 
Cast.Value.class.getName());
+        sourceProps.put("transforms." + xformName + ".spec", castType);
+
+        sourceConfig = new SourceConnectorConfig(MOCK_PLUGINS, sourceProps, 
true);
+
+        Map<String, TopicCreationGroup> groups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+        TopicCreation topicCreation = 
TopicCreation.newTopicCreation(workerConfig, groups);
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+        assertThat(topicCreation.defaultTopicGroup(), 
is(groups.get(DEFAULT_TOPIC_CREATION_GROUP)));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems(FOO_GROUP, 
BAR_GROUP));
+        assertEquals(topicCreation.defaultTopicGroup(), 
topicCreation.findFirstGroup(FOO_TOPIC));
+        topicCreation.addTopic(FOO_TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+
+        List<Transformation<SourceRecord>> transformations = 
sourceConfig.transformations();
+        assertEquals(1, transformations.size());
+        Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0);
+        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+        assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
+        assertEquals((byte) 42, transformed.value());
+    }
+
+    @Test
+    public void topicCreationWithTwoGroupsAndTwoTransformations() {
+        short fooReplicas = 3;
+        int partitions = 5;
+        int barPartitions = 1;
+
+        sourceProps = defaultConnectorPropsWithTopicCreation();
+        sourceProps.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", 
FOO_GROUP, BAR_GROUP));
+        sourceProps.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, 
String.valueOf(partitions));
+        // Setting here but they should be ignored for the default group
+        sourceProps.put(TOPIC_CREATION_PREFIX + FOO_GROUP + "." + 
INCLUDE_REGEX_CONFIG, FOO_TOPIC);
+        sourceProps.put(TOPIC_CREATION_PREFIX + FOO_GROUP + "." + 
REPLICATION_FACTOR_CONFIG, String.valueOf(fooReplicas));
+        sourceProps.put(TOPIC_CREATION_PREFIX + BAR_GROUP + "." + 
INCLUDE_REGEX_CONFIG, BAR_REGEX);
+        sourceProps.put(TOPIC_CREATION_PREFIX + BAR_GROUP + "." + 
PARTITIONS_CONFIG, String.valueOf(barPartitions));
+
+        String castName = "cast";
+        String castType = "int8";
+        sourceProps.put("transforms." + castName + ".type", 
Cast.Value.class.getName());
+        sourceProps.put("transforms." + castName + ".spec", castType);
+
+        String regexRouterName = "regex";
+        sourceProps.put("transforms." + regexRouterName + ".type", 
RegexRouter.class.getName());
+        sourceProps.put("transforms." + regexRouterName + ".regex", "(.*)");
+        sourceProps.put("transforms." + regexRouterName + ".replacement", 
"prefix-$1");
+
+        sourceProps.put("transforms", String.join(",", castName, 
regexRouterName));
+
+        Map<String, String> fooTopicProps = new HashMap<>();
+        fooTopicProps.put(RETENTION_MS_CONFIG, 
String.valueOf(TimeUnit.DAYS.toMillis(30)));
+        fooTopicProps.forEach((k, v) -> sourceProps.put(TOPIC_CREATION_PREFIX 
+ FOO_GROUP + "." + k, v));
+
+        Map<String, String> barTopicProps = new HashMap<>();
+        barTopicProps.put(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT);
+        barTopicProps.forEach((k, v) -> sourceProps.put(TOPIC_CREATION_PREFIX 
+ BAR_GROUP + "." + k, v));
+
+        // verify config creation
+        sourceConfig = new SourceConnectorConfig(MOCK_PLUGINS, sourceProps, 
true);
+        assertTrue(sourceConfig.usesTopicCreation());
+        assertEquals(DEFAULT_REPLICATION_FACTOR, (short) 
sourceConfig.topicCreationReplicationFactor(DEFAULT_TOPIC_CREATION_GROUP));
+        assertEquals(partitions, (int) 
sourceConfig.topicCreationPartitions(DEFAULT_TOPIC_CREATION_GROUP));
+        
assertThat(sourceConfig.topicCreationInclude(DEFAULT_TOPIC_CREATION_GROUP), 
is(Collections.singletonList(".*")));
+        
assertThat(sourceConfig.topicCreationExclude(DEFAULT_TOPIC_CREATION_GROUP), 
is(Collections.emptyList()));
+        
assertThat(sourceConfig.topicCreationOtherConfigs(DEFAULT_TOPIC_CREATION_GROUP),
 is(Collections.emptyMap()));
+
+        // verify topic creation group is instantiated correctly
+        Map<String, TopicCreationGroup> groups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+        assertEquals(3, groups.size());
+        assertThat(groups.keySet(), hasItems(DEFAULT_TOPIC_CREATION_GROUP, 
FOO_GROUP, BAR_GROUP));
+
+        // verify topic creation
+        TopicCreation topicCreation = 
TopicCreation.newTopicCreation(workerConfig, groups);
+        TopicCreationGroup defaultGroup = topicCreation.defaultTopicGroup();
+        // Default group will match all topics besides empty string
+        assertTrue(defaultGroup.matches(" "));
+        assertTrue(defaultGroup.matches(FOO_TOPIC));
+        assertTrue(defaultGroup.matches(BAR_TOPIC));
+        assertEquals(DEFAULT_TOPIC_CREATION_GROUP, defaultGroup.name());
+        TopicCreationGroup fooGroup = groups.get(FOO_GROUP);
+        assertFalse(fooGroup.matches(" "));
+        assertTrue(fooGroup.matches(FOO_TOPIC));
+        assertFalse(fooGroup.matches(BAR_TOPIC));
+        assertEquals(FOO_GROUP, fooGroup.name());
+        TopicCreationGroup barGroup = groups.get(BAR_GROUP);
+        assertTrue(barGroup.matches(BAR_TOPIC));
+        assertFalse(barGroup.matches(FOO_TOPIC));
+        assertEquals(BAR_GROUP, barGroup.name());
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+        assertTrue(topicCreation.isTopicCreationRequired(BAR_TOPIC));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems(FOO_GROUP, 
BAR_GROUP));
+        assertEquals(fooGroup, topicCreation.findFirstGroup(FOO_TOPIC));
+        assertEquals(barGroup, topicCreation.findFirstGroup(BAR_TOPIC));
+        topicCreation.addTopic(FOO_TOPIC);
+        topicCreation.addTopic(BAR_TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+        assertFalse(topicCreation.isTopicCreationRequired(BAR_TOPIC));
+
+        // verify new topic properties
+        String otherTopic = "any-other-topic";
+        NewTopic defaultTopicSpec = 
topicCreation.findFirstGroup(otherTopic).newTopic(otherTopic);
+        assertEquals(otherTopic, defaultTopicSpec.name());
+        assertEquals(DEFAULT_REPLICATION_FACTOR, 
defaultTopicSpec.replicationFactor());
+        assertEquals(partitions, defaultTopicSpec.numPartitions());
+        assertThat(defaultTopicSpec.configs(), is(Collections.emptyMap()));
+
+        NewTopic fooTopicSpec = 
topicCreation.findFirstGroup(FOO_TOPIC).newTopic(FOO_TOPIC);
+        assertEquals(FOO_TOPIC, fooTopicSpec.name());
+        assertEquals(fooReplicas, fooTopicSpec.replicationFactor());
+        assertEquals(partitions, fooTopicSpec.numPartitions());
+        assertThat(fooTopicSpec.configs(), is(fooTopicProps));
+
+        NewTopic barTopicSpec = 
topicCreation.findFirstGroup(BAR_TOPIC).newTopic(BAR_TOPIC);
+        assertEquals(BAR_TOPIC, barTopicSpec.name());
+        assertEquals(DEFAULT_REPLICATION_FACTOR, 
barTopicSpec.replicationFactor());
+        assertEquals(barPartitions, barTopicSpec.numPartitions());
+        assertThat(barTopicSpec.configs(), is(barTopicProps));
+
+        List<Transformation<SourceRecord>> transformations = 
sourceConfig.transformations();
+        assertEquals(2, transformations.size());
+
+        Cast<SourceRecord> castXForm = (Cast<SourceRecord>) 
transformations.get(0);
+        SourceRecord transformed = castXForm.apply(new SourceRecord(null, 
null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+        assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
+        assertEquals((byte) 42, transformed.value());
+
+        RegexRouter<SourceRecord> regexRouterXForm = 
(RegexRouter<SourceRecord>) transformations.get(1);
+        transformed = regexRouterXForm.apply(new SourceRecord(null, null, 
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+        assertEquals("prefix-topic", transformed.topic());
+    }
 }

Reply via email to