This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch CAMEL-23300/couchbase-connection-string in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit abfafd344832cb31f478be1807fad048b0cb9878 Author: Croway <[email protected]> AuthorDate: Mon Apr 13 11:35:49 2026 +0200 CAMEL-23300: pass connectionString to couchbase-sink connector Expose the connectionString endpoint option so the Couchbase SDK connects to the correct KV port when Testcontainers maps ports randomly. --- .../couchbasesink/CamelCouchbasesinkSinkConnectorConfig.java | 4 ++++ .../src/main/resources/kamelets/couchbase-sink.kamelet.yaml | 7 ++++++- .../couchbase/sink/CamelCouchbasePropertyFactory.java | 4 ++++ .../kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java | 3 ++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/connectors/camel-couchbase-sink-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/couchbasesink/CamelCouchbasesinkSinkConnectorConfig.java b/connectors/camel-couchbase-sink-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/couchbasesink/CamelCouchbasesinkSinkConnectorConfig.java index 0f4a52dc29..04537fa9ff 100644 --- a/connectors/camel-couchbase-sink-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/couchbasesink/CamelCouchbasesinkSinkConnectorConfig.java +++ b/connectors/camel-couchbase-sink-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/couchbasesink/CamelCouchbasesinkSinkConnectorConfig.java @@ -47,6 +47,9 @@ public class CamelCouchbasesinkSinkConnectorConfig public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_CONF = "camel.kamelet.couchbase-sink.startingId"; public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_DOC = "The starting id"; public static final Integer CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_DEFAULT = 1; + public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_CONF = "camel.kamelet.couchbase-sink.connectionString"; + public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_DOC = "The full Couchbase SDK connection string (e.g. couchbase://host:port). When set, it takes precedence over hostname extraction for the KV service port."; + public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_DEFAULT = null; public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_CONF = "camel.kamelet.couchbase-sink.autoStartId"; public static final String CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_DOC = "Auto Start Id or not"; public static final Boolean CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_DEFAULT = true; @@ -71,6 +74,7 @@ public class CamelCouchbasesinkSinkConnectorConfig conf.define(CAMEL_SINK_COUCHBASESINK_KAMELET_USERNAME_CONF, ConfigDef.Type.STRING, CAMEL_SINK_COUCHBASESINK_KAMELET_USERNAME_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_COUCHBASESINK_KAMELET_USERNAME_DOC); conf.define(CAMEL_SINK_COUCHBASESINK_KAMELET_PASSWORD_CONF, ConfigDef.Type.PASSWORD, CAMEL_SINK_COUCHBASESINK_KAMELET_PASSWORD_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_COUCHBASESINK_KAMELET_PASSWORD_DOC); conf.define(CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_CONF, ConfigDef.Type.INT, CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_COUCHBASESINK_KAMELET_STARTING_ID_DOC); + conf.define(CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_CONF, ConfigDef.Type.STRING, CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_COUCHBASESINK_KAMELET_CONNECTION_STRING_DOC); conf.define(CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_CONF, ConfigDef.Type.BOOLEAN, CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_DEFAULT, ConfigDef.Importance.MEDIUM, CAMEL_SINK_COUCHBASESINK_KAMELET_AUTO_START_ID_DOC); return conf; } diff --git a/connectors/camel-couchbase-sink-kafka-connector/src/main/resources/kamelets/couchbase-sink.kamelet.yaml b/connectors/camel-couchbase-sink-kafka-connector/src/main/resources/kamelets/couchbase-sink.kamelet.yaml index 8eabfc78ba..85392235b1 100644 --- a/connectors/camel-couchbase-sink-kafka-connector/src/main/resources/kamelets/couchbase-sink.kamelet.yaml +++ b/connectors/camel-couchbase-sink-kafka-connector/src/main/resources/kamelets/couchbase-sink.kamelet.yaml @@ -72,6 +72,10 @@ spec: description: The starting id type: integer default: 1 + connectionString: + title: Connection String + description: The full Couchbase SDK connection string (e.g. couchbase://host:port). When set, it takes precedence over hostname extraction for the KV service port. + type: string autoStartId: title: Auto Start Id description: Auto Start Id or not @@ -91,4 +95,5 @@ spec: autoStartIdForInserts: "{{autoStartId}}" startingIdForInsertsFrom: "{{startingId}}" username: "{{username}}" - password: "{{password}}" \ No newline at end of file + password: "{{password}}" + connectionString: "{{?connectionString}}" \ No newline at end of file diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java index e97623901c..c905cdf223 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java @@ -45,6 +45,10 @@ public class CamelCouchbasePropertyFactory extends SinkConnectorPropertyFactory< return setProperty("camel.kamelet.couchbase-sink.password", value); } + public CamelCouchbasePropertyFactory withConnectionString(String value) { + return setProperty("camel.kamelet.couchbase-sink.connectionString", value); + } + public static CamelCouchbasePropertyFactory basic() { return new CamelCouchbasePropertyFactory() .withTasksMax(1) diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java index a43019d3e3..a1fc7b3dc0 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java @@ -221,7 +221,8 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { .withHostname(service.getHostname()) .withPort(service.getPort()) .withUsername(service.getUsername()) - .withPassword(service.getPassword()); + .withPassword(service.getPassword()) + .withConnectionString(service.getConnectionString()); runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); }
