This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 46de22e KAFKA-10387: Fix inclusion of transformation configs when
topic creation is enabled in Connect (#9172)
46de22e is described below
commit 46de22e8a130e9a424a103092d8e4efbd83337e0
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Sun Aug 16 21:12:50 2020 -0700
KAFKA-10387: Fix inclusion of transformation configs when topic creation is
enabled in Connect (#9172)
Addition of configs for custom topic creation with KIP-158 created a
regression when transformation configs are also included in the configuration
of a source connector.
To experience the issue, just enabling topic creation at the worker is not
sufficient. A user needs to supply a source connector configuration that
contains both transformations and custom topic creation properties.
The issue is that the enrichment of configs in `SourceConnectorConfig`
happens on top of an `AbstractConfig` rather than a `ConnectorConfig`.
Inheriting from the latter allows enrichment to be composable for both topic
creation and transformations.
Unit tests and integration tests are written to test these combinations.
Reviewers: Randall Hauch <[email protected]>
---
.../connect/runtime/SourceConnectorConfig.java | 11 +-
.../integration/TransformationIntegrationTest.java | 40 +++---
.../kafka/connect/util/TopicCreationTest.java | 150 +++++++++++++++++++++
3 files changed, 179 insertions(+), 22 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 5e175a5..fc5832a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -47,9 +47,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
+ "created by source connectors";
private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic
Creation Groups";
- private static class EnrichedSourceConnectorConfig extends AbstractConfig {
- EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String>
props) {
- super(configDef, props);
+ private static class EnrichedSourceConnectorConfig extends ConnectorConfig
{
+ EnrichedSourceConnectorConfig(Plugins plugins, ConfigDef configDef,
Map<String, String> props) {
+ super(plugins, configDef, props);
}
@Override
@@ -129,8 +129,9 @@ public class SourceConnectorConfig extends ConnectorConfig {
propsWithoutRegexForDefaultGroup.entrySet()
.removeIf(e ->
e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + INCLUDE_REGEX_CONFIG)
|| e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX
+ EXCLUDE_REGEX_CONFIG));
- enrichedSourceConfig = new
EnrichedSourceConnectorConfig(enrich(defaultConfigDef, props,
- defaultGroup), propsWithoutRegexForDefaultGroup);
+ enrichedSourceConfig = new EnrichedSourceConnectorConfig(plugins,
+ enrich(defaultConfigDef, props, defaultGroup),
+ propsWithoutRegexForDefaultGroup);
} else {
enrichedSourceConfig = null;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index ca9d55d..379172a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -16,11 +16,6 @@
*/
package org.apache.kafka.connect.integration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
@@ -36,6 +31,11 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
import static java.util.Collections.singletonMap;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -44,6 +44,9 @@ import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
@@ -73,20 +76,21 @@ public class TransformationIntegrationTest {
@Before
public void setup() {
// setup Connect worker properties
- Map<String, String> exampleWorkerProps = new HashMap<>();
- exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(5_000));
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(5_000));
// setup Kafka broker properties
- Properties exampleBrokerProps = new Properties();
- exampleBrokerProps.put("auto.create.topics.enable", "false");
+ Properties brokerProps = new Properties();
+ // This is required because tests in this class also test
per-connector topic creation with transformations
+ brokerProps.put("auto.create.topics.enable", "false");
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.numBrokers(1)
- .workerProps(exampleWorkerProps)
- .brokerProps(exampleBrokerProps)
+ .workerProps(workerProps)
+ .brokerProps(brokerProps)
.build();
// start the clusters
@@ -262,16 +266,15 @@ public class TransformationIntegrationTest {
}
/**
- * Test the {@link Filter} transformer with a
- * {@link HasHeaderKey} predicate on a source connector.
+ * Test the {@link Filter} transformer with a {@link HasHeaderKey}
predicate on a source connector.
+ * Note that this test uses topic creation configs to allow the source
connector to create
+ * the topic when it tries to produce the first source record, instead of
requiring the topic
+ * to exist before the connector starts.
*/
@Test
- public void testFilterOnHasHeaderKeyWithSourceConnector() throws Exception
{
+ public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation()
throws Exception {
assertConnectReady();
- // create test topic
- connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
-
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put("name", CONNECTOR_NAME);
@@ -288,6 +291,9 @@ public class TransformationIntegrationTest {
props.put(PREDICATES_CONFIG, "headerPredicate");
props.put(PREDICATES_CONFIG + ".headerPredicate.type",
HasHeaderKey.class.getSimpleName());
props.put(PREDICATES_CONFIG + ".headerPredicate.name", "header-8");
+ // custom topic creation is used, so there's no need to proactively
create the test topic
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG,
String.valueOf(-1));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG,
String.valueOf(NUM_TOPIC_PARTITIONS));
// expect all records to be produced by the connector
connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index 3a51d10..feb0e5f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -18,15 +18,21 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.RegexRouter;
+import org.apache.kafka.connect.transforms.Transformation;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -485,4 +491,148 @@ public class TopicCreationTest {
assertEquals(barPartitions, barTopicSpec.numPartitions());
assertThat(barTopicSpec.configs(), is(barTopicProps));
}
+
+ @Test
+ public void testTopicCreationWithSingleTransformation() {
+ sourceProps = defaultConnectorPropsWithTopicCreation();
+ sourceProps.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",",
FOO_GROUP, BAR_GROUP));
+ String xformName = "example";
+ String castType = "int8";
+ sourceProps.put("transforms", xformName);
+ sourceProps.put("transforms." + xformName + ".type",
Cast.Value.class.getName());
+ sourceProps.put("transforms." + xformName + ".spec", castType);
+
+ sourceConfig = new SourceConnectorConfig(MOCK_PLUGINS, sourceProps,
true);
+
+ Map<String, TopicCreationGroup> groups =
TopicCreationGroup.configuredGroups(sourceConfig);
+ TopicCreation topicCreation =
TopicCreation.newTopicCreation(workerConfig, groups);
+
+ assertTrue(topicCreation.isTopicCreationEnabled());
+ assertTrue(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+ assertThat(topicCreation.defaultTopicGroup(),
is(groups.get(DEFAULT_TOPIC_CREATION_GROUP)));
+ assertEquals(2, topicCreation.topicGroups().size());
+ assertThat(topicCreation.topicGroups().keySet(), hasItems(FOO_GROUP,
BAR_GROUP));
+ assertEquals(topicCreation.defaultTopicGroup(),
topicCreation.findFirstGroup(FOO_TOPIC));
+ topicCreation.addTopic(FOO_TOPIC);
+ assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+
+ List<Transformation<SourceRecord>> transformations =
sourceConfig.transformations();
+ assertEquals(1, transformations.size());
+ Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0);
+ SourceRecord transformed = xform.apply(new SourceRecord(null, null,
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+ assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
+ assertEquals((byte) 42, transformed.value());
+ }
+
+ @Test
+ public void topicCreationWithTwoGroupsAndTwoTransformations() {
+ short fooReplicas = 3;
+ int partitions = 5;
+ int barPartitions = 1;
+
+ sourceProps = defaultConnectorPropsWithTopicCreation();
+ sourceProps.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",",
FOO_GROUP, BAR_GROUP));
+ sourceProps.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG,
String.valueOf(partitions));
+ // Setting here but they should be ignored for the default group
+ sourceProps.put(TOPIC_CREATION_PREFIX + FOO_GROUP + "." +
INCLUDE_REGEX_CONFIG, FOO_TOPIC);
+ sourceProps.put(TOPIC_CREATION_PREFIX + FOO_GROUP + "." +
REPLICATION_FACTOR_CONFIG, String.valueOf(fooReplicas));
+ sourceProps.put(TOPIC_CREATION_PREFIX + BAR_GROUP + "." +
INCLUDE_REGEX_CONFIG, BAR_REGEX);
+ sourceProps.put(TOPIC_CREATION_PREFIX + BAR_GROUP + "." +
PARTITIONS_CONFIG, String.valueOf(barPartitions));
+
+ String castName = "cast";
+ String castType = "int8";
+ sourceProps.put("transforms." + castName + ".type",
Cast.Value.class.getName());
+ sourceProps.put("transforms." + castName + ".spec", castType);
+
+ String regexRouterName = "regex";
+ sourceProps.put("transforms." + regexRouterName + ".type",
RegexRouter.class.getName());
+ sourceProps.put("transforms." + regexRouterName + ".regex", "(.*)");
+ sourceProps.put("transforms." + regexRouterName + ".replacement",
"prefix-$1");
+
+ sourceProps.put("transforms", String.join(",", castName,
regexRouterName));
+
+ Map<String, String> fooTopicProps = new HashMap<>();
+ fooTopicProps.put(RETENTION_MS_CONFIG,
String.valueOf(TimeUnit.DAYS.toMillis(30)));
+ fooTopicProps.forEach((k, v) -> sourceProps.put(TOPIC_CREATION_PREFIX
+ FOO_GROUP + "." + k, v));
+
+ Map<String, String> barTopicProps = new HashMap<>();
+ barTopicProps.put(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT);
+ barTopicProps.forEach((k, v) -> sourceProps.put(TOPIC_CREATION_PREFIX
+ BAR_GROUP + "." + k, v));
+
+ // verify config creation
+ sourceConfig = new SourceConnectorConfig(MOCK_PLUGINS, sourceProps,
true);
+ assertTrue(sourceConfig.usesTopicCreation());
+ assertEquals(DEFAULT_REPLICATION_FACTOR, (short)
sourceConfig.topicCreationReplicationFactor(DEFAULT_TOPIC_CREATION_GROUP));
+ assertEquals(partitions, (int)
sourceConfig.topicCreationPartitions(DEFAULT_TOPIC_CREATION_GROUP));
+
assertThat(sourceConfig.topicCreationInclude(DEFAULT_TOPIC_CREATION_GROUP),
is(Collections.singletonList(".*")));
+
assertThat(sourceConfig.topicCreationExclude(DEFAULT_TOPIC_CREATION_GROUP),
is(Collections.emptyList()));
+
assertThat(sourceConfig.topicCreationOtherConfigs(DEFAULT_TOPIC_CREATION_GROUP),
is(Collections.emptyMap()));
+
+ // verify topic creation group is instantiated correctly
+ Map<String, TopicCreationGroup> groups =
TopicCreationGroup.configuredGroups(sourceConfig);
+ assertEquals(3, groups.size());
+ assertThat(groups.keySet(), hasItems(DEFAULT_TOPIC_CREATION_GROUP,
FOO_GROUP, BAR_GROUP));
+
+ // verify topic creation
+ TopicCreation topicCreation =
TopicCreation.newTopicCreation(workerConfig, groups);
+ TopicCreationGroup defaultGroup = topicCreation.defaultTopicGroup();
+ // Default group will match all topics besides empty string
+ assertTrue(defaultGroup.matches(" "));
+ assertTrue(defaultGroup.matches(FOO_TOPIC));
+ assertTrue(defaultGroup.matches(BAR_TOPIC));
+ assertEquals(DEFAULT_TOPIC_CREATION_GROUP, defaultGroup.name());
+ TopicCreationGroup fooGroup = groups.get(FOO_GROUP);
+ assertFalse(fooGroup.matches(" "));
+ assertTrue(fooGroup.matches(FOO_TOPIC));
+ assertFalse(fooGroup.matches(BAR_TOPIC));
+ assertEquals(FOO_GROUP, fooGroup.name());
+ TopicCreationGroup barGroup = groups.get(BAR_GROUP);
+ assertTrue(barGroup.matches(BAR_TOPIC));
+ assertFalse(barGroup.matches(FOO_TOPIC));
+ assertEquals(BAR_GROUP, barGroup.name());
+
+ assertTrue(topicCreation.isTopicCreationEnabled());
+ assertTrue(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+ assertTrue(topicCreation.isTopicCreationRequired(BAR_TOPIC));
+ assertEquals(2, topicCreation.topicGroups().size());
+ assertThat(topicCreation.topicGroups().keySet(), hasItems(FOO_GROUP,
BAR_GROUP));
+ assertEquals(fooGroup, topicCreation.findFirstGroup(FOO_TOPIC));
+ assertEquals(barGroup, topicCreation.findFirstGroup(BAR_TOPIC));
+ topicCreation.addTopic(FOO_TOPIC);
+ topicCreation.addTopic(BAR_TOPIC);
+ assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
+ assertFalse(topicCreation.isTopicCreationRequired(BAR_TOPIC));
+
+ // verify new topic properties
+ String otherTopic = "any-other-topic";
+ NewTopic defaultTopicSpec =
topicCreation.findFirstGroup(otherTopic).newTopic(otherTopic);
+ assertEquals(otherTopic, defaultTopicSpec.name());
+ assertEquals(DEFAULT_REPLICATION_FACTOR,
defaultTopicSpec.replicationFactor());
+ assertEquals(partitions, defaultTopicSpec.numPartitions());
+ assertThat(defaultTopicSpec.configs(), is(Collections.emptyMap()));
+
+ NewTopic fooTopicSpec =
topicCreation.findFirstGroup(FOO_TOPIC).newTopic(FOO_TOPIC);
+ assertEquals(FOO_TOPIC, fooTopicSpec.name());
+ assertEquals(fooReplicas, fooTopicSpec.replicationFactor());
+ assertEquals(partitions, fooTopicSpec.numPartitions());
+ assertThat(fooTopicSpec.configs(), is(fooTopicProps));
+
+ NewTopic barTopicSpec =
topicCreation.findFirstGroup(BAR_TOPIC).newTopic(BAR_TOPIC);
+ assertEquals(BAR_TOPIC, barTopicSpec.name());
+ assertEquals(DEFAULT_REPLICATION_FACTOR,
barTopicSpec.replicationFactor());
+ assertEquals(barPartitions, barTopicSpec.numPartitions());
+ assertThat(barTopicSpec.configs(), is(barTopicProps));
+
+ List<Transformation<SourceRecord>> transformations =
sourceConfig.transformations();
+ assertEquals(2, transformations.size());
+
+ Cast<SourceRecord> castXForm = (Cast<SourceRecord>)
transformations.get(0);
+ SourceRecord transformed = castXForm.apply(new SourceRecord(null,
null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+ assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
+ assertEquals((byte) 42, transformed.value());
+
+ RegexRouter<SourceRecord> regexRouterXForm =
(RegexRouter<SourceRecord>) transformations.get(1);
+ transformed = regexRouterXForm.apply(new SourceRecord(null, null,
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
+ assertEquals("prefix-topic", transformed.topic());
+ }
}