This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e5184d8f610bda5a3db886d03e93d239c172587f Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Oct 7 15:57:57 2020 +0200 [core] refactor camel main --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 25 ++-- .../camel/kafkaconnector/CamelSourceTask.java | 28 ++-- .../utils/CamelKafkaConnectMain.java | 129 ++++++++++++++++ .../kafkaconnector/utils/CamelMainSupport.java | 165 --------------------- .../camel/kafkaconnector/CamelSinkTaskTest.java | 38 ++--- .../camel/kafkaconnector/CamelSourceTaskTest.java | 18 +-- .../camel/kafkaconnector/DataFormatTest.java | 31 ++-- .../kafkaconnector/PropertiesNameFormatsTest.java | 4 +- 8 files changed, 199 insertions(+), 239 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index c6242cb..c39a4f4 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -29,8 +28,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat; -import org.apache.camel.kafkaconnector.utils.CamelMainSupport; +import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.camel.kafkaconnector.utils.TaskHelper; import org.apache.camel.support.DefaultExchange; import org.apache.commons.lang3.StringUtils; @@ -57,7 +55,7 @@ public class CamelSinkTask extends SinkTask { private static final String LOCAL_URL = "direct:start"; - private CamelMainSupport cms; + private CamelKafkaConnectMain cms; private ProducerTemplate producer; private CamelSinkConnectorConfig config; @@ -76,13 +74,6 @@ public class CamelSinkTask extends SinkTask { String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF); final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF); - List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>(); - if (unmarshaller != null) { - dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL)); - } - if (marshaller != null) { - dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); - } final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF); final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF); @@ -95,9 +86,15 @@ public class CamelSinkTask extends SinkTask { CAMEL_SINK_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, dataformats, size, timeout, camelContext); + cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl) + .withProperties(actualProps) + .withUnmarshallDataFormat(unmarshaller) + .withMarshallDataFormat(marshaller) + .withAggregationSize(size) + .withAggregationTimeout(timeout) + .build(camelContext); - producer = cms.createProducerTemplate(); + producer = cms.getProducerTemplate(); cms.start(); LOG.info("CamelSinkTask connector task started"); @@ -229,7 +226,7 @@ public class CamelSinkTask extends SinkTask { } } - public CamelMainSupport getCms() { + CamelKafkaConnectMain getCms() { return cms; } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index e825e49..42d9214 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -25,18 +25,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.PollingConsumer; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat; -import org.apache.camel.kafkaconnector.utils.CamelMainSupport; +import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.camel.kafkaconnector.utils.SchemaHelper; import org.apache.camel.kafkaconnector.utils.TaskHelper; import org.apache.kafka.connect.data.Decimal; @@ -58,7 +55,7 @@ public class CamelSourceTask extends SourceTask { private static final String LOCAL_URL = "direct:end"; - private CamelMainSupport cms; + private CamelKafkaConnectMain cms; private CamelSourceConnectorConfig config; private PollingConsumer consumer; private String topic; @@ -87,13 +84,7 @@ public class CamelSourceTask extends SourceTask { String remoteUrl = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF); final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF); final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF); - List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>(); - if (unmarshaller != null) { - dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL)); - } - if (marshaller != null) { - dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); - } + topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF); topics = Arrays.asList(topic.split(",")); @@ -106,10 +97,15 @@ public class CamelSourceTask extends SourceTask { CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, dataformats, 10, 500, camelContext); + cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl) + .withProperties(actualProps) + .withUnmarshallDataFormat(unmarshaller) + .withMarshallDataFormat(marshaller) + .withAggregationSize(10) + .withAggregationTimeout(500) + .build(camelContext); - Endpoint endpoint = cms.getEndpoint(localUrl); - consumer = endpoint.createPollingConsumer(); + consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer(); consumer.start(); cms.start(); @@ -268,7 +264,7 @@ public class CamelSourceTask extends SourceTask { + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull; } - public CamelMainSupport getCms() { + CamelKafkaConnectMain getCms() { return cms; } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 46b0822..4150dea 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -16,12 +16,23 @@ */ package org.apache.camel.kafkaconnector.utils; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.main.BaseMainSupport; import org.apache.camel.main.MainListener; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.DataFormat; +import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,4 +130,122 @@ public class CamelKafkaConnectMain extends BaseMainSupport { return this.consumerTemplate; } + + public static Builder builder(String from, String to) { + return new Builder(from, to); + } + + public static final class Builder { + private final String from; + private final String to; + private Map<String, String> props; + private String marshallDataFormat; + private String unmarshallDataFormat; + private int aggregationSize; + private long aggregationTimeout; + + public Builder(String from, String to) { + this.from = from; + this.to = to; + } + + public Builder withProperties(Map<String, String> props) { + this.props = new HashMap<>(props); + return this; + } + + public Builder withMarshallDataFormat(String dataformatId) { + this.marshallDataFormat = dataformatId; + return this; + } + + public Builder withUnmarshallDataFormat(String dataformatId) { + this.unmarshallDataFormat = dataformatId; + return this; + } + + public Builder withAggregationSize(int aggregationSize) { + this.aggregationSize = aggregationSize; + return this; + } + + public Builder withAggregationTimeout(long aggregationTimeout) { + this.aggregationTimeout = aggregationTimeout; + return this; + } + + public CamelKafkaConnectMain build(CamelContext camelContext) { + CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext); + camelMain.configure().setAutoConfigurationLogSummary(false); + + Properties camelProperties = new Properties(); + camelProperties.putAll(props); + + LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); + camelMain.setInitialProperties(camelProperties); + + //creating the actual route + camelMain.configure().addRoutesBuilder(new RouteBuilder() { + public void configure() { + //from + RouteDefinition rd = from(from); + LOG.info("Creating Camel route from({})", from); + + //dataformats + if (!ObjectHelper.isEmpty(marshallDataFormat)) { + LOG.info(".marshal().custom({})", marshallDataFormat); + getContext().getRegistry().bind(marshallDataFormat, lookupAndInstantiateDataformat(getContext(), marshallDataFormat)); + rd.marshal().custom(marshallDataFormat); + } + if (!ObjectHelper.isEmpty(unmarshallDataFormat)) { + LOG.info(".unmarshal().custom({})", unmarshallDataFormat); + getContext().getRegistry().bind(unmarshallDataFormat, lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat)); + rd.unmarshal().custom(unmarshallDataFormat); + } + if (getContext().getRegistry().lookupByName("aggregate") != null) { + //aggregation + AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate"); + LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); + LOG.info(".to({})", to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); + } else { + //to + LOG.info(".to({})", to); + rd.toD(to); + } + } + }); + + return camelMain; + } + } + + private static DataFormat lookupAndInstantiateDataformat(CamelContext camelContext, String dataformatName) { + DataFormat df = camelContext.resolveDataFormat(dataformatName); + + if (df == null) { + df = camelContext.createDataFormat(dataformatName); + + final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + "."; + final Properties props = camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix)); + + CamelContextAware.trySetCamelContext(df, camelContext); + + if (!props.isEmpty()) { + PropertyBindingSupport.build() + .withCamelContext(camelContext) + .withOptionPrefix(prefix) + .withRemoveParameters(false) + .withProperties((Map) props) + .withTarget(df) + .bind(); + } + } + + //TODO: move it to the caller? + if (df == null) { + throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName); + } + return df; + } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java deleted file mode 100644 index 6c17ecc..0000000 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.kafkaconnector.utils; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.camel.AggregationStrategy; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.ConsumerTemplate; -import org.apache.camel.Endpoint; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.model.RouteDefinition; -import org.apache.camel.spi.DataFormat; -import org.apache.camel.support.PropertyBindingSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CamelMainSupport { - public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat."; - private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class); - - private final CamelKafkaConnectMain camelMain; - - public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) { - camelMain = new CamelKafkaConnectMain(camelContext); - camelMain.configure().setAutoConfigurationLogSummary(false); - - Properties camelProperties = new Properties(); - camelProperties.putAll(props); - - LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); - camelMain.setInitialProperties(camelProperties); - - //creating the actual route - camelMain.configure().addRoutesBuilder(new RouteBuilder() { - public void configure() { - //from - RouteDefinition rd = from(fromUrl); - - //dataformats - LOG.info("Creating Camel route from({})", fromUrl); - for (CamelKafkaConnectDataformat dataformat : dataformats) { - String dataformatId = dataformat.getDataformatId(); - switch (dataformat.getDataformatKind()) { - case MARSHALL: - LOG.info(".marshal().custom({})", dataformatId); - getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); - rd.marshal().custom(dataformatId); - break; - case UNMARSHALL: - LOG.info(".unmarshal().custom({})", dataformatId); - getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); - rd.unmarshal().custom(dataformatId); - break; - default: - throw new UnsupportedOperationException("Unsupported dataformat: " + dataformat); - } - } - - if (getContext().getRegistry().lookupByName("aggregate") != null) { - //aggregation - AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate"); - LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); - LOG.info(".to({})", toUrl); - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl); - } else { - //to - LOG.info(".to({})", toUrl); - rd.toD(toUrl); - } - } - }); - } - - public void start() { - LOG.info("Starting CamelContext"); - - try { - camelMain.start(); - } catch (Exception e) { - LOG.info("CamelContext failed to start", e); - throw e; - } - - LOG.info("CamelContext started"); - } - - public void stop() { - LOG.info("Stopping CamelContext"); - - try { - camelMain.stop(); - } catch (Exception e) { - LOG.info("CamelContext failed to stop", e); - throw e; - } - - LOG.info("CamelContext stopped"); - } - - public ProducerTemplate createProducerTemplate() { - return camelMain.getProducerTemplate(); - } - - public Endpoint getEndpoint(String uri) { - return camelMain.getCamelContext().getEndpoint(uri); - } - - public Collection<Endpoint> getEndpoints() { - return camelMain.getCamelContext().getEndpoints(); - } - - public ConsumerTemplate createConsumerTemplate() { - return camelMain.getConsumerTemplate(); - } - - private DataFormat lookupAndInstantiateDataformat(String dataformatName) { - DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName); - - if (df == null) { - df = camelMain.getCamelContext().createDataFormat(dataformatName); - - final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + "."; - final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix)); - - CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext()); - - if (!props.isEmpty()) { - PropertyBindingSupport.build() - .withCamelContext(camelMain.getCamelContext()) - .withOptionPrefix(prefix) - .withRemoveParameters(false) - .withProperties((Map) props) - .withTarget(df) - .bind(); - } - } - - //TODO: move it to the caller? - if (df == null) { - throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName); - } - return df; - } - -} diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index a050943..3136e98 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -58,7 +58,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -67,7 +67,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testTopicsRegex() { Map<String, String> props = new HashMap<>(); @@ -84,7 +84,7 @@ public class CamelSinkTaskTest { records.add(record1); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -128,7 +128,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -172,7 +172,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -222,7 +222,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -281,7 +281,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -345,7 +345,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -407,7 +407,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -466,7 +466,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -509,7 +509,7 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -533,11 +533,11 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertEquals(1, sinkTask.getCms().getEndpoints() + assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints() .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count()); sinkTask.stop(); @@ -560,12 +560,12 @@ public class CamelSinkTaskTest { records.add(record); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - assertEquals(1, sinkTask.getCms().getEndpoints() + assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints() .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count()); sinkTask.stop(); @@ -590,7 +590,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testAggregationBody() { Map<String, String> props = new HashMap<>(); @@ -616,7 +616,7 @@ public class CamelSinkTaskTest { records.add(record5); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); @@ -625,7 +625,7 @@ public class CamelSinkTaskTest { sinkTask.stop(); } - + @Test public void testAggregationBodyAndTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); @@ -652,7 +652,7 @@ public class CamelSinkTaskTest { records.add(record5); sinkTask.put(records); - ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 8fd8cb4..4d268de 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -39,7 +39,7 @@ public class CamelSourceTaskTest { private static final String TOPIC_NAME = "my-topic"; private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) { - final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + final ProducerTemplate template = sourceTask.getCms().getProducerTemplate(); for (int i = 0; i < size; i++) { template.sendBody(DIRECT_URI, "test" + i); } @@ -113,7 +113,7 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + final ProducerTemplate template = sourceTask.getCms().getProducerTemplate(); // key in the message with body template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 1234); @@ -150,7 +150,7 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + final ProducerTemplate template = sourceTask.getCms().getProducerTemplate(); // send String template.sendBody(DIRECT_URI, "test"); @@ -206,9 +206,9 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - assertEquals(2, sourceTask.getCms().getEndpoints().size()); + assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size()); - sourceTask.getCms().getEndpoints().stream() + sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) .forEach(e -> { assertTrue(e.getEndpointUri().contains("foo")); @@ -231,9 +231,9 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - assertEquals(2, sourceTask.getCms().getEndpoints().size()); + assertEquals(2, sourceTask.getCms().getCamelContext().getEndpoints().size()); - sourceTask.getCms().getEndpoints().stream() + sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("direct")) .forEach(e -> { assertTrue(e.getEndpointUri().contains("end")); @@ -259,7 +259,7 @@ public class CamelSourceTaskTest { CamelSourceTask sourceTask = new CamelSourceTask(); sourceTask.start(props); - sourceTask.getCms().getEndpoints().stream() + sourceTask.getCms().getCamelContext().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) .forEach(e -> { assertTrue(e.getEndpointUri().contains("foo")); @@ -281,7 +281,7 @@ public class CamelSourceTaskTest { sourceTask.start(props); - final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + final ProducerTemplate template = sourceTask.getCms().getProducerTemplate(); template.sendBodyAndHeader(DIRECT_URI, "test", "bigdecimal", new BigDecimal(1234567890)); List<SourceRecord> results = sourceTask.poll(); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index 679d554..0ce5ab0 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -16,16 +16,12 @@ */ package org.apache.camel.kafkaconnector; -import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import org.apache.camel.component.hl7.HL7DataFormat; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat; -import org.apache.camel.kafkaconnector.utils.CamelMainSupport; +import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain; import org.apache.camel.model.dataformat.SyslogDataFormat; import org.apache.kafka.connect.errors.ConnectException; import org.junit.jupiter.api.Test; @@ -82,12 +78,13 @@ public class DataFormatTest { props.put("topics", "mytopic"); props.put("camel.source.marshal", "hl7"); props.put("camel.source.unmarshal", "syslog"); - - List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>(); - dataformats.add(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); - dataformats.add(new CamelKafkaConnectDataformat("syslog", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL)); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc); + + CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test") + .withProperties(props) + .withUnmarshallDataFormat("syslog") + .withMarshallDataFormat("hl7") + .build(dcc); HL7DataFormat hl7Df = new HL7DataFormat(); hl7Df.setValidate(false); @@ -111,9 +108,11 @@ public class DataFormatTest { props.put("topics", "mytopic"); props.put("camel.source.marshal", "hl7"); - List<CamelKafkaConnectDataformat> dataformats = Collections.singletonList(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc); + CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test") + .withProperties(props) + .withMarshallDataFormat("hl7") + .build(dcc); HL7DataFormat hl7df = new HL7DataFormat(); hl7df.setValidate(false); @@ -133,9 +132,13 @@ public class DataFormatTest { props.put("camel.source.marshal", "hl7"); props.put("camel.dataformat.hl7.validate", "false"); - List<CamelKafkaConnectDataformat> dataformats = Collections.singletonList(new CamelKafkaConnectDataformat("hl7", CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", dataformats, 10, 500, dcc); + + CamelKafkaConnectMain cms = CamelKafkaConnectMain.builder("direct://start", "log://test") + .withProperties(props) + .withMarshallDataFormat("hl7") + .build(dcc); + cms.start(); HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java index 5b032fe..6e3240c 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java @@ -39,7 +39,7 @@ public class PropertiesNameFormatsTest { CamelSourceTask camelsourceTask = new CamelSourceTask(); camelsourceTask.start(props); - BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory(); + BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory(); assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", sedaTestQueue.getClass().getName()); assertEquals(1, ((TestBlockingQueueFactory)sedaTestQueue).getCounter()); camelsourceTask.stop(); @@ -55,7 +55,7 @@ public class PropertiesNameFormatsTest { CamelSourceTask camelsourceTask = new CamelSourceTask(); camelsourceTask.start(props); - BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory(); + BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory(); assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", sedaTestQueue.getClass().getName()); assertEquals(1, ((TestBlockingQueueFactory)sedaTestQueue).getCounter()); camelsourceTask.stop();