This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new a0c1c60 core: add support for aggreation on source sinks a0c1c60 is described below commit a0c1c6015fe05ce9ab473d826401990811591dd1 Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Thu Oct 8 11:03:09 2020 +0200 core: add support for aggreation on source sinks --- core/pom.xml | 5 ++ .../camel/kafkaconnector/CamelConnectorConfig.java | 49 +++++++++++++ .../kafkaconnector/CamelSinkConnectorConfig.java | 21 +----- .../apache/camel/kafkaconnector/CamelSinkTask.java | 4 +- .../kafkaconnector/CamelSourceConnectorConfig.java | 8 +- .../camel/kafkaconnector/CamelSourceTask.java | 6 +- .../utils/CamelKafkaConnectMain.java | 3 +- .../camel/kafkaconnector/CamelSinkTaskTest.java | 10 +-- .../camel/kafkaconnector/CamelSourceTaskTest.java | 85 ++++++++++++++++++++++ .../utils/StringJoinerAggregator.java | 54 ++++++++++++++ 10 files changed, 215 insertions(+), 30 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index fe05166..61454c1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -141,6 +141,11 @@ <artifactId>camel-aws2-sqs</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java new file mode 100644 index 0000000..fd62052 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +public abstract class CamelConnectorConfig extends AbstractConfig { + public static final String CAMEL_CONNECTOR_AGGREGATE_DEFAULT = null; + public static final String CAMEL_CONNECTOR_AGGREGATE_NAME = "aggregate"; + public static final String CAMEL_CONNECTOR_AGGREGATE_CONF = "camel.beans." + CAMEL_CONNECTOR_AGGREGATE_NAME; + public static final String CAMEL_CONNECTOR_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:"; + + public static final Integer CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT = 10; + public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size"; + public static final String CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate"; + + public static final Long CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT = 500L; + public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout"; + public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate"; + + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { + super(definition, originals, configProviderProps, doLog); + } + + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals) { + super(definition, originals); + } + + protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) { + super(definition, originals, doLog); + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 9b4b2df..980aad1 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector; import java.util.Map; import org.apache.camel.LoggingLevel; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -public class CamelSinkConnectorConfig extends AbstractConfig { +public class CamelSinkConnectorConfig extends CamelConnectorConfig { public static final String CAMEL_SINK_MARSHAL_DEFAULT = null; public static final String CAMEL_SINK_MARSHAL_CONF = "camel.sink.marshal"; public static final String CAMEL_SINK_MARSHAL_DOC = "The camel dataformat name to use to marshal data to the destination"; @@ -47,27 +46,15 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel"; public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF."; - public static final String CAMEL_SINK_AGGREGATE_DEFAULT = null; - public static final String CAMEL_SINK_AGGREGATE_CONF = "camel.beans.aggregate"; - public static final String CAMEL_SINK_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:"; - - public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10; - public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size"; - public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate"; - - public static final Long CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT = 500L; - public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout"; - public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate"; - private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) .define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC) .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC) .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC) - .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC) - .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC) - .define(CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_TIMEOUT_DOC); + .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC) + .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC) + .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 2a97de4..51cfe88 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -75,8 +75,8 @@ public class CamelSinkTask extends SinkTask { String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF); final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF); - final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF); - final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF); + final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); + final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java index 92878ae..bdfc5c7 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java @@ -19,12 +19,11 @@ package org.apache.camel.kafkaconnector; import java.util.Map; import org.apache.camel.LoggingLevel; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -public class CamelSourceConnectorConfig extends AbstractConfig { +public class CamelSourceConnectorConfig extends CamelConnectorConfig { public static final String CAMEL_SOURCE_UNMARSHAL_DEFAULT = null; public static final String CAMEL_SOURCE_UNMARSHAL_CONF = "camel.source.unmarshal"; public static final String CAMEL_SOURCE_UNMARSHAL_DOC = "The camel dataformat name to use to unmarshal data from the source"; @@ -88,7 +87,10 @@ public class CamelSourceConnectorConfig extends AbstractConfig { .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC) .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC) .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC) - .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC); + .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC) + .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC) + .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC) + .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC); public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 604b9b1..96a7ad4 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -82,6 +82,8 @@ public class CamelSourceTask extends SourceTask { String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF); final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF); final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF); + final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF); + final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF); topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); @@ -98,8 +100,8 @@ public class CamelSourceTask extends SourceTask { .withProperties(actualProps) .withUnmarshallDataFormat(unmarshaller) .withMarshallDataFormat(marshaller) - .withAggregationSize(10) - .withAggregationTimeout(500) + .withAggregationSize(size) + .withAggregationTimeout(timeout) .build(camelContext); consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 4150dea..dcd6398 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -26,6 +26,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.kafkaconnector.CamelConnectorConfig; import org.apache.camel.main.BaseMainSupport; import org.apache.camel.main.MainListener; import org.apache.camel.model.RouteDefinition; @@ -204,7 +205,7 @@ public class CamelKafkaConnectMain extends BaseMainSupport { } if (getContext().getRegistry().lookupByName("aggregate") != null) { //aggregation - AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate"); + AggregationStrategy s = getContext().getRegistry().lookupByNameAndType(CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME, AggregationStrategy.class); LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); LOG.info(".to({})", to); rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 3136e98..6e28b0e 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -596,8 +596,8 @@ public class CamelSinkTaskTest { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); - props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5"); CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); @@ -631,9 +631,9 @@ public class CamelSinkTaskTest { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); - props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5"); - props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, "100"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5"); + props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100"); CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 4d268de..4a7b5b1 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -17,18 +17,24 @@ package org.apache.camel.kafkaconnector; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.camel.LoggingLevel; import org.apache.camel.ProducerTemplate; +import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; +import static org.apache.camel.util.CollectionHelper.mapOf; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -293,4 +299,83 @@ public class CamelSourceTaskTest { sourceTask.stop(); } + + @Test + public void testSourcePollingWithAggregationBySize() { + final int size = 10; + final int chunkSize = 5; + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize + )); + + try { + assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)) + .isInstanceOf(StringJoinerAggregator.class) + .hasFieldOrPropertyWithValue("delimiter", "|"); + + for (int i = 0; i < size; i++) { + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, Integer.toString(i)); + } + + List<SourceRecord> records = sourceTask.poll(); + + assertThat(records).hasSize(size / chunkSize); + + for (int i = 0; i < size / chunkSize; i++) { + assertThat(records) + .element(i) + .hasFieldOrPropertyWithValue( + "value", + IntStream.range(i * chunkSize, (i * chunkSize) + chunkSize).mapToObj(Integer::toString).collect(Collectors.joining("|")) + ); + } + + } finally { + sourceTask.stop(); + } + } + + @Test + public void testSourcePollingWithAggregationBySizeAndTimeout() { + final int size = 3; + final int chunkSize = 2; + final long chunkTimeout = 500L; + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.StringJoinerAggregator", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF + ".delimiter", "|", + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, chunkTimeout, + CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, chunkSize + )); + + try { + assertThat(sourceTask.getCms().getCamelContext().getRegistry().lookupByName(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)) + .isInstanceOf(StringJoinerAggregator.class) + .hasFieldOrPropertyWithValue("delimiter", "|"); + + for (int i = 0; i < size; i++) { + sourceTask.getCms().getProducerTemplate().sendBody(DIRECT_URI, Integer.toString(i)); + } + + List<SourceRecord> records = new ArrayList<>(); + while (records.size() < 2) { + records.addAll(sourceTask.poll()); + } + + assertThat(records).hasSize(2); + assertThat(records).element(0).hasFieldOrPropertyWithValue("value", "0|1"); + assertThat(records).element(1).hasFieldOrPropertyWithValue("value", "2"); + } finally { + sourceTask.stop(); + } + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java new file mode 100644 index 0000000..f0f455b --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/StringJoinerAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.utils; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +public class StringJoinerAggregator implements AggregationStrategy { + private String delimiter = ","; + + public String getDelimiter() { + return delimiter; + } + + public StringJoinerAggregator setDelimiter(String delimiter) { + this.delimiter = delimiter; + return this; + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // lets append the old body to the new body + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + if (body != null) { + Message newIn = newExchange.getIn(); + String newBody = newIn.getBody(String.class); + if (newBody != null) { + body += delimiter + newBody; + } + + newIn.setBody(body); + } + return newExchange; + } +}