This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a1abf28 KCA: Option to sanitize topic name for the conenctors that
cannot handle pulsar topic names (#14475)
a1abf28 is described below
commit a1abf285b11df90c5535eb852db98d8ee03556a9
Author: Andrey Yegorov <[email protected]>
AuthorDate: Mon Feb 28 12:59:40 2022 -0800
KCA: Option to sanitize topic name for the conenctors that cannot handle
pulsar topic names (#14475)
### Motivation
Some kafka connectors do not sanitize topic names (or incompletely do) to
match what downstream system supports.
It works in kafka in most cases, assuming appropriately named topics. This
does not work well with Kafka Connect Adaptor because URI part is getting there.
### Modifications
* Flag to sanitize the topic name (disabled by default) and corresponding
functionality.
* test
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
- added unit test
- verified with specific connector that didn't work without this change
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API: (yes / no)
- The schema: (yes / no / don't know)
- The default values of configurations: (yes / no)
- The wire protocol: (yes / no)
- The rest endpoints: (yes / no)
- The admin cli options: (yes / no)
- Anything that affects deployment: (yes / no / don't know)
### Documentation
Check the box below or label this PR directly (if you have committer
privilege).
Need to update docs?
- [ ] `doc-required`
(If you need help on updating docs, create a doc issue)
- [ ] `no-need-doc`
(Please explain why)
- [x] `doc`
Config parameter documented in FieldDoc.
(cherry picked from commit fd9e63937f5553f1c064f486afd165b0ce6a16af)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 32 ++++++++++++-
.../connect/PulsarKafkaConnectSinkConfig.java | 8 ++++
.../io/kafka/connect/KafkaConnectSinkTest.java | 54 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 1 deletion(-)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index fe74e68..0d20f49 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.io.kafka.connect;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -29,6 +31,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -81,6 +84,11 @@ public class KafkaConnectSink implements Sink<GenericObject>
{
protected String topicName;
+ private boolean sanitizeTopicName = false;
+ private final Cache<String, String> sanitizedTopicCache =
+ CacheBuilder.newBuilder().maximumSize(1000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
@Override
public void write(Record<GenericObject> sourceRecord) {
if (log.isDebugEnabled()) {
@@ -135,6 +143,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
"Source must run with Exclusive or Failover subscription
type");
topicName = kafkaSinkConfig.getTopic();
unwrapKeyValueIfAvailable =
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+ sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
String kafkaConnectorFQClassName =
kafkaSinkConfig.getKafkaConnectorSinkClass();
kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
@@ -274,7 +283,7 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
}
- return new SinkRecord(topic,
+ return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
partition,
keySchema,
key,
@@ -290,4 +299,25 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
return taskContext.currentOffset(topic, partition);
}
+ // Replace all non-letter, non-digit characters with underscore.
+ // Append underscore in front of name if it does not begin with alphabet
or underscore.
+ protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
+ if (!sanitize) {
+ return name;
+ }
+
+ try {
+ return sanitizedTopicCache.get(name, () -> {
+ String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
+ if (sanitizedName.matches("^[^a-zA-Z_].*")) {
+ sanitizedName = "_" + sanitizedName;
+ }
+ return sanitizedName;
+ });
+ } catch (ExecutionException e) {
+ log.error("Failed to get sanitized topic name for {}", name, e);
+ throw new IllegalStateException("Failed to get sanitized topic
name for " + name, e);
+ }
+ }
+
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 5a470e9..b3feea2 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -73,6 +73,14 @@ public class PulsarKafkaConnectSinkConfig implements
Serializable {
help = "In case of Record<KeyValue<>> data use key from KeyValue<>
instead of one from Record.")
private boolean unwrapKeyValueIfAvailable = true;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Some connectors cannot handle pulsar topic names like
persistent://a/b/topic"
+ + " and do not sanitize the topic name themselves. \n"
+ + "If enabled, all non alpha-digital characters in topic
name will be replaced with underscores. \n"
+ + "In some cases it may result in topic name collisions
(topic_a and topic.a will become the same)")
+ private boolean sanitizeTopicName = false;
+
public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws
IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile),
PulsarKafkaConnectSinkConfig.class);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index afc742c..8d08ebc 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -46,6 +46,8 @@ import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -64,7 +66,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -76,6 +80,19 @@ import static org.testng.Assert.fail;
@Slf4j
public class KafkaConnectSinkTest extends ProducerConsumerBase {
+ public class ResultCaptor<T> implements Answer {
+ private T result = null;
+ public T getResult() {
+ return result;
+ }
+
+ @Override
+ public T answer(InvocationOnMock invocationOnMock) throws Throwable {
+ result = (T) invocationOnMock.callRealMethod();
+ return result;
+ }
+ }
+
private String offsetTopicName =
"persistent://my-property/my-ns/kafka-connect-sink-offset";
private Path file;
@@ -153,6 +170,43 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
}
@Test
+ public void sanitizeTest() throws Exception {
+ props.put("sanitizeTopicName", "true");
+ KafkaConnectSink originalSink = new KafkaConnectSink();
+ KafkaConnectSink sink = spy(originalSink);
+
+ final ResultCaptor<SinkRecord> resultCaptor = new ResultCaptor<>();
+ doAnswer(resultCaptor).when(sink).toSinkRecord(any());
+
+ sink.open(props, context);
+
+ final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName("persistent://a-b/c-d/fake-topic.a")
+ .message(msg)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .schema(Schema.STRING)
+ .build();
+
+ sink.write(record);
+ sink.flush();
+
+ assertEquals(status.get(), 1);
+ assertEquals(resultCaptor.getResult().topic(),
"persistent___a_b_c_d_fake_topic_a");
+
+ sink.close();
+
+ List<String> lines = Files.readAllLines(file,
StandardCharsets.US_ASCII);
+ assertEquals(lines.get(0), "value");
+ }
+
+ @Test
public void seekPauseResumeTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);