This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch fixing-topics-def in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b60bb85b4a76a434bf7a790f19f9e3bc93e6724a Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu May 7 13:58:14 2020 +0200 Make the topics property consistent between source and sink connectors --- config/CamelSourceConnector.properties | 2 +- .../main/docs/camel-aws-s3-kafka-source-connector.adoc | 2 +- .../kafkaconnector/CamelSourceConnectorConfig.java | 2 +- .../camel/kafkaconnector/CamelSourceTaskTest.java | 18 +++++++++--------- .../apache/camel/kafkaconnector/DataFormatTest.java | 6 +++--- .../kafkaconnector/PropertiesNameFormatsTest.java | 4 ++-- .../camel/kafkaconnector/PropertiesOrderTest.java | 4 ++-- .../camel-aws-s3-kafka-source-connector.adoc | 2 +- .../pages/try-it-out-on-openshift-with-strimzi.adoc | 2 +- examples/CamelAWSKinesisSourceConnector.properties | 2 +- examples/CamelAWSS3SourceConnector.properties | 2 +- examples/CamelAWSSQSSourceConnector.properties | 2 +- examples/CamelCassandraQLSourceConnector.properties | 2 +- examples/CamelJmsSourceConnector.properties | 4 ++-- examples/CamelTelegramSourceConnector.properties | 2 +- .../kafkaconnector/SourceConnectorPropertyFactory.java | 2 +- 16 files changed, 29 insertions(+), 29 deletions(-) diff --git a/config/CamelSourceConnector.properties b/config/CamelSourceConnector.properties index 8e578a4..3d88460 100644 --- a/config/CamelSourceConnector.properties +++ b/config/CamelSourceConnector.properties @@ -21,6 +21,6 @@ connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=timer:kafkaconnector diff --git a/connectors/camel-aws-s3-kafka-connector/src/main/docs/camel-aws-s3-kafka-source-connector.adoc b/connectors/camel-aws-s3-kafka-connector/src/main/docs/camel-aws-s3-kafka-source-connector.adoc index df20ad4..b4764ef 100644 --- a/connectors/camel-aws-s3-kafka-connector/src/main/docs/camel-aws-s3-kafka-source-connector.adoc +++ b/connectors/camel-aws-s3-kafka-connector/src/main/docs/camel-aws-s3-kafka-source-connector.adoc @@ -92,7 +92,7 @@ value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConvert camel.source.maxPollDuration=10000 -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 3de49dd..01a55b8 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -39,7 +39,7 @@ public class CamelSourceConnectorConfig extends AbstractConfig { public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source"; public static final String TOPIC_DEFAULT = "test"; - public static final String TOPIC_CONF = "camel.source.kafka.topic"; + public static final String TOPIC_CONF = "topics"; public static final String TOPIC_DOC = "The topic to publish data to"; public static final Long CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT = 1000L; diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 830a013..2f02d68 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -41,7 +41,7 @@ public class CamelSourceTaskTest { public void testSourcePolling() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", TIMER_URI); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); @@ -68,7 +68,7 @@ public class CamelSourceTaskTest { public void testSourcePollingWithKey() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct:start"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey"); CamelSourceTask camelSourceTask = new CamelSourceTask(); @@ -113,7 +113,7 @@ public class CamelSourceTaskTest { public void testSourcePollingWithBody() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct:start"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); @@ -173,7 +173,7 @@ public class CamelSourceTaskTest { public void testSourcePollingTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", TIMER_URI); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.maxPollDuration", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); @@ -203,7 +203,7 @@ public class CamelSourceTaskTest { public void testSourcePollingMaxRecordNumber() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", TIMER_URI); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.maxBatchPollSize", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); @@ -220,7 +220,7 @@ public class CamelSourceTaskTest { public void testSourcePollingConsumerOptions() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "timer:kafkaconnector"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.pollingConsumerQueueSize", "10"); props.put("camel.source.pollingConsumerBlockTimeout", "1000"); props.put("camel.source.pollingConsumerBlockWhenFull", "false"); @@ -241,7 +241,7 @@ public class CamelSourceTaskTest { public void testUrlPrecedenceOnComponentProperty() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", TIMER_URI); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed"); props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed"); props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "shouldNotBeUsed"); @@ -270,7 +270,7 @@ public class CamelSourceTaskTest { @Test public void testSourcePollingUsingComponentProperty() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000"); props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector"); @@ -302,7 +302,7 @@ public class CamelSourceTaskTest { @Test public void testSourcePollingUsingMultipleComponentProperties() throws InterruptedException { Map<String, String> props = new HashMap<>(); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000"); props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "0"); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index bd07866..911f93d 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -34,7 +34,7 @@ public class DataFormatTest { public void testDataFormatSource() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.marshal", "syslog"); CamelSourceTask camelsourceTask = new CamelSourceTask(); @@ -80,7 +80,7 @@ public class DataFormatTest { public void testDataFormatLookUpInRegistry() throws Exception { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.marshal", "hl7"); DefaultCamelContext dcc = new DefaultCamelContext(); @@ -100,7 +100,7 @@ public class DataFormatTest { public void testDataFormatConfiguration() throws Exception { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.source.marshal", "hl7"); props.put("camel.dataformat.hl7.validate", "false"); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java index 9b39e18..5b032fe 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java @@ -33,7 +33,7 @@ public class PropertiesNameFormatsTest { public void testCamelCaseFormat() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "seda://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.component.seda.defaultQueueFactory", "#class:org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory"); props.put("camel.component.seda.defaultQueueFactory.counter", "1"); @@ -49,7 +49,7 @@ public class PropertiesNameFormatsTest { public void testDashSeparatedFormat() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "seda://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.component.seda.default-queue-factory", "#class:org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory"); props.put("camel.component.seda.default-queue-factory.counter", "1"); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesOrderTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesOrderTest.java index 4d58be8..3ff42de 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesOrderTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesOrderTest.java @@ -27,7 +27,7 @@ public class PropertiesOrderTest { public void testOneOrder() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "seda://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.component.seda.defaultQueueFactory", "#class:org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory"); props.put("camel.component.seda.defaultQueueFactory.counter", "1"); @@ -40,7 +40,7 @@ public class PropertiesOrderTest { public void testOppositeOrder() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "seda://test"); - props.put("camel.source.kafka.topic", "mytopic"); + props.put("topics", "mytopic"); props.put("camel.component.seda.defaultQueueFactory.counter", "1"); props.put("camel.component.seda.defaultQueueFactory", "#class:org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory"); diff --git a/docs/modules/ROOT/pages/connectors/camel-aws-s3-kafka-source-connector.adoc b/docs/modules/ROOT/pages/connectors/camel-aws-s3-kafka-source-connector.adoc index df20ad4..b4764ef 100644 --- a/docs/modules/ROOT/pages/connectors/camel-aws-s3-kafka-source-connector.adoc +++ b/docs/modules/ROOT/pages/connectors/camel-aws-s3-kafka-source-connector.adoc @@ -92,7 +92,7 @@ value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConvert camel.source.maxPollDuration=10000 -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false diff --git a/docs/modules/ROOT/pages/try-it-out-on-openshift-with-strimzi.adoc b/docs/modules/ROOT/pages/try-it-out-on-openshift-with-strimzi.adoc index d0ce476..5bb1b91 100644 --- a/docs/modules/ROOT/pages/try-it-out-on-openshift-with-strimzi.adoc +++ b/docs/modules/ROOT/pages/try-it-out-on-openshift-with-strimzi.adoc @@ -98,7 +98,7 @@ oc exec -i -c kafka my-cluster-kafka-0 -- curl -X POST \ "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter", - "camel.source.kafka.topic": "s3-topic", + "topics": "s3-topic", "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false", "camel.source.maxPollDuration": 10000, "camel.component.aws-s3.configuration.access-key": "xxx", diff --git a/examples/CamelAWSKinesisSourceConnector.properties b/examples/CamelAWSKinesisSourceConnector.properties index 1084279..f8dcd77 100644 --- a/examples/CamelAWSKinesisSourceConnector.properties +++ b/examples/CamelAWSKinesisSourceConnector.properties @@ -17,7 +17,7 @@ connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=aws-kinesis://kinesis1?shardClosed=ignore diff --git a/examples/CamelAWSS3SourceConnector.properties b/examples/CamelAWSS3SourceConnector.properties index 7ce4d9e..fc4be7f 100644 --- a/examples/CamelAWSS3SourceConnector.properties +++ b/examples/CamelAWSS3SourceConnector.properties @@ -24,7 +24,7 @@ value.converter=org.apache.camel.kafkaconnector.converters.S3ObjectConverter camel.source.maxPollDuration=10000 -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false diff --git a/examples/CamelAWSSQSSourceConnector.properties b/examples/CamelAWSSQSSourceConnector.properties index e541667..5d85a40 100644 --- a/examples/CamelAWSSQSSourceConnector.properties +++ b/examples/CamelAWSSQSSourceConnector.properties @@ -20,7 +20,7 @@ connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=aws-sqs://mysqs diff --git a/examples/CamelCassandraQLSourceConnector.properties b/examples/CamelCassandraQLSourceConnector.properties index 5be1414..ea60f4c 100644 --- a/examples/CamelCassandraQLSourceConnector.properties +++ b/examples/CamelCassandraQLSourceConnector.properties @@ -22,7 +22,7 @@ value.converter=org.apache.kafka.connect.storage.StringConverter camel.source.maxPollDuration=10000 -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=cql://172.17.0.2/test?cql=select * from users&resultSetConversionStrategy=ONE diff --git a/examples/CamelJmsSourceConnector.properties b/examples/CamelJmsSourceConnector.properties index 2527016..2500d5a 100644 --- a/examples/CamelJmsSourceConnector.properties +++ b/examples/CamelJmsSourceConnector.properties @@ -21,7 +21,7 @@ connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=sjms2://queue:myqueue @@ -30,4 +30,4 @@ camel.component.sjms2.connection-factory.brokerURL=tcp://localhost:61616 # If using AMQP via QPid JMS: # camel.component.sjms2.connection-factory=#class:org.apache.qpid.jms.JmsConnectionFactory -# camel.component.sjms2.connection-factory.remoteURI=amqp://localhost:5672 \ No newline at end of file +# camel.component.sjms2.connection-factory.remoteURI=amqp://localhost:5672 diff --git a/examples/CamelTelegramSourceConnector.properties b/examples/CamelTelegramSourceConnector.properties index eadb7d2..5c69fcc 100644 --- a/examples/CamelTelegramSourceConnector.properties +++ b/examples/CamelTelegramSourceConnector.properties @@ -21,6 +21,6 @@ connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -camel.source.kafka.topic=mytopic +topics=mytopic camel.source.url=telegram:bots/<your bot token (Ask the bot's father)> diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/SourceConnectorPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/SourceConnectorPropertyFactory.java index 3261388..221384d 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/SourceConnectorPropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/SourceConnectorPropertyFactory.java @@ -20,7 +20,7 @@ package org.apache.camel.kafkaconnector; public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { public T withKafkaTopic(String topic) { - getProperties().put("camel.source.kafka.topic", topic); + getProperties().put("topics", topic); return (T) this; }