This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e5184d8f610bda5a3db886d03e93d239c172587f
Author: Luca Burgazzoli <lburgazz...@gmail.com>
AuthorDate: Wed Oct 7 15:57:57 2020 +0200

    [core] refactor camel main
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  25 ++--
 .../camel/kafkaconnector/CamelSourceTask.java      |  28 ++--
 .../utils/CamelKafkaConnectMain.java               | 129 ++++++++++++++++
 .../kafkaconnector/utils/CamelMainSupport.java     | 165 ---------------------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  38 ++---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  18 +--
 .../camel/kafkaconnector/DataFormatTest.java       |  31 ++--
 .../kafkaconnector/PropertiesNameFormatsTest.java  |   4 +-
 8 files changed, 199 insertions(+), 239 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index c6242cb..c39a4f4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -20,7 +20,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -29,8 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.commons.lang3.StringUtils;
@@ -57,7 +55,7 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
 
 
-    private CamelMainSupport cms;
+    private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private CamelSinkConnectorConfig config;
 
@@ -76,13 +74,6 @@ public class CamelSinkTask extends SinkTask {
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
             final String unmarshaller = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
-            List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-            if (unmarshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
-            }
-            if (marshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(marshaller, 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-            }
             final int size = 
config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
             final long timeout = 
config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
 
@@ -95,9 +86,15 @@ public class CamelSinkTask extends SinkTask {
                                                 
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
dataformats, size, timeout, camelContext);
+            cms = CamelKafkaConnectMain.builder(LOCAL_URL, remoteUrl)
+                .withProperties(actualProps)
+                .withUnmarshallDataFormat(unmarshaller)
+                .withMarshallDataFormat(marshaller)
+                .withAggregationSize(size)
+                .withAggregationTimeout(timeout)
+                .build(camelContext);
 
-            producer = cms.createProducerTemplate();
+            producer = cms.getProducerTemplate();
 
             cms.start();
             LOG.info("CamelSinkTask connector task started");
@@ -229,7 +226,7 @@ public class CamelSinkTask extends SinkTask {
         }
     }
 
-    public CamelMainSupport getCms() {
+    CamelKafkaConnectMain getCms() {
         return cms;
     }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index e825e49..42d9214 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -25,18 +25,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.kafka.connect.data.Decimal;
@@ -58,7 +55,7 @@ public class CamelSourceTask extends SourceTask {
 
     private static final String LOCAL_URL = "direct:end";
 
-    private CamelMainSupport cms;
+    private CamelKafkaConnectMain cms;
     private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
     private String topic;
@@ -87,13 +84,7 @@ public class CamelSourceTask extends SourceTask {
             String remoteUrl = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF);
             final String unmarshaller = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF);
             final String marshaller = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
-            List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-            if (unmarshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(unmarshaller, 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
-            }
-            if (marshaller != null) {
-                dataformats.add(new CamelKafkaConnectDataformat(marshaller, 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-            }
+
             topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF);
             topics = Arrays.asList(topic.split(","));
 
@@ -106,10 +97,15 @@ public class CamelSourceTask extends SourceTask {
                                                 
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, 
dataformats, 10, 500, camelContext);
+            cms = CamelKafkaConnectMain.builder(remoteUrl, localUrl)
+                .withProperties(actualProps)
+                .withUnmarshallDataFormat(unmarshaller)
+                .withMarshallDataFormat(marshaller)
+                .withAggregationSize(10)
+                .withAggregationTimeout(500)
+                .build(camelContext);
 
-            Endpoint endpoint = cms.getEndpoint(localUrl);
-            consumer = endpoint.createPollingConsumer();
+            consumer = 
cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
             consumer.start();
 
             cms.start();
@@ -268,7 +264,7 @@ public class CamelSourceTask extends SourceTask {
                + "&pollingConsumerBlockWhenFull=" + 
pollingConsumerBlockWhenFull;
     }
 
-    public CamelMainSupport getCms() {
+    CamelKafkaConnectMain getCms() {
         return cms;
     }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 46b0822..4150dea 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -16,12 +16,23 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.support.PropertyBindingSupport;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,4 +130,122 @@ public class CamelKafkaConnectMain extends 
BaseMainSupport {
 
         return this.consumerTemplate;
     }
+
+    public static Builder builder(String from, String to) {
+        return new Builder(from, to);
+    }
+
+    public static final class Builder {
+        private final String from;
+        private final String to;
+        private Map<String, String> props;
+        private String marshallDataFormat;
+        private String unmarshallDataFormat;
+        private int aggregationSize;
+        private long aggregationTimeout;
+
+        public Builder(String from, String to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        public Builder withProperties(Map<String, String> props) {
+            this.props = new HashMap<>(props);
+            return this;
+        }
+
+        public Builder withMarshallDataFormat(String dataformatId) {
+            this.marshallDataFormat = dataformatId;
+            return this;
+        }
+
+        public Builder withUnmarshallDataFormat(String dataformatId) {
+            this.unmarshallDataFormat = dataformatId;
+            return this;
+        }
+
+        public Builder withAggregationSize(int aggregationSize) {
+            this.aggregationSize = aggregationSize;
+            return this;
+        }
+
+        public Builder withAggregationTimeout(long aggregationTimeout) {
+            this.aggregationTimeout = aggregationTimeout;
+            return this;
+        }
+
+        public CamelKafkaConnectMain build(CamelContext camelContext) {
+            CamelKafkaConnectMain camelMain = new 
CamelKafkaConnectMain(camelContext);
+            camelMain.configure().setAutoConfigurationLogSummary(false);
+
+            Properties camelProperties = new Properties();
+            camelProperties.putAll(props);
+
+            LOG.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
+            camelMain.setInitialProperties(camelProperties);
+
+            //creating the actual route
+            camelMain.configure().addRoutesBuilder(new RouteBuilder() {
+                public void configure() {
+                    //from
+                    RouteDefinition rd = from(from);
+                    LOG.info("Creating Camel route from({})", from);
+
+                    //dataformats
+                    if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                        LOG.info(".marshal().custom({})", marshallDataFormat);
+                        getContext().getRegistry().bind(marshallDataFormat, 
lookupAndInstantiateDataformat(getContext(), marshallDataFormat));
+                        rd.marshal().custom(marshallDataFormat);
+                    }
+                    if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                        LOG.info(".unmarshal().custom({})", 
unmarshallDataFormat);
+                        getContext().getRegistry().bind(unmarshallDataFormat, 
lookupAndInstantiateDataformat(getContext(), unmarshallDataFormat));
+                        rd.unmarshal().custom(unmarshallDataFormat);
+                    }
+                    if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {
+                        //aggregation
+                        AggregationStrategy s = (AggregationStrategy) 
getContext().getRegistry().lookupByName("aggregate");
+                        
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})",
 s, aggregationSize, aggregationTimeout);
+                        LOG.info(".to({})", to);
+                        
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to);
+                    } else {
+                        //to
+                        LOG.info(".to({})", to);
+                        rd.toD(to);
+                    }
+                }
+            });
+
+            return camelMain;
+        }
+    }
+
+    private static DataFormat lookupAndInstantiateDataformat(CamelContext 
camelContext, String dataformatName) {
+        DataFormat df = camelContext.resolveDataFormat(dataformatName);
+
+        if (df == null) {
+            df = camelContext.createDataFormat(dataformatName);
+
+            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + 
dataformatName + ".";
+            final Properties props = 
camelContext.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
+
+            CamelContextAware.trySetCamelContext(df, camelContext);
+
+            if (!props.isEmpty()) {
+                PropertyBindingSupport.build()
+                    .withCamelContext(camelContext)
+                    .withOptionPrefix(prefix)
+                    .withRemoveParameters(false)
+                    .withProperties((Map) props)
+                    .withTarget(df)
+                    .bind();
+            }
+        }
+
+        //TODO: move it to the caller?
+        if (df == null) {
+            throw new UnsupportedOperationException("No DataFormat found with 
name " + dataformatName);
+        }
+        return df;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
deleted file mode 100644
index 6c17ecc..0000000
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.utils;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.camel.AggregationStrategy;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Endpoint;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.support.PropertyBindingSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CamelMainSupport {
-    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = 
"camel.dataformat.";
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelMainSupport.class);
-
-    private final CamelKafkaConnectMain camelMain;
-
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long 
aggregationTimeout, CamelContext camelContext) {
-        camelMain = new CamelKafkaConnectMain(camelContext);
-        camelMain.configure().setAutoConfigurationLogSummary(false);
-
-        Properties camelProperties = new Properties();
-        camelProperties.putAll(props);
-
-        LOG.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
-        camelMain.setInitialProperties(camelProperties);
-
-        //creating the actual route
-        camelMain.configure().addRoutesBuilder(new RouteBuilder() {
-            public void configure() {
-                //from
-                RouteDefinition rd = from(fromUrl);
-
-                //dataformats
-                LOG.info("Creating Camel route from({})", fromUrl);
-                for (CamelKafkaConnectDataformat dataformat : dataformats) {
-                    String dataformatId = dataformat.getDataformatId();
-                    switch (dataformat.getDataformatKind()) {
-                        case MARSHALL:
-                            LOG.info(".marshal().custom({})", dataformatId);
-                            getContext().getRegistry().bind(dataformatId, 
lookupAndInstantiateDataformat(dataformatId));
-                            rd.marshal().custom(dataformatId);
-                            break;
-                        case UNMARSHALL:
-                            LOG.info(".unmarshal().custom({})", dataformatId);
-                            getContext().getRegistry().bind(dataformatId, 
lookupAndInstantiateDataformat(dataformatId));
-                            rd.unmarshal().custom(dataformatId);
-                            break;
-                        default:
-                            throw new 
UnsupportedOperationException("Unsupported dataformat: " + dataformat);
-                    }
-                }
-
-                if (getContext().getRegistry().lookupByName("aggregate") != 
null) {
-                    //aggregation
-                    AggregationStrategy s = (AggregationStrategy) 
getContext().getRegistry().lookupByName("aggregate");
-                    
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})",
 s, aggregationSize, aggregationTimeout);
-                    LOG.info(".to({})", toUrl);
-                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
-                } else {
-                    //to
-                    LOG.info(".to({})", toUrl);
-                    rd.toD(toUrl);
-                }
-            }
-        });
-    }
-
-    public void start() {
-        LOG.info("Starting CamelContext");
-
-        try {
-            camelMain.start();
-        } catch (Exception e) {
-            LOG.info("CamelContext failed to start", e);
-            throw e;
-        }
-
-        LOG.info("CamelContext started");
-    }
-
-    public void stop() {
-        LOG.info("Stopping CamelContext");
-
-        try {
-            camelMain.stop();
-        } catch (Exception e) {
-            LOG.info("CamelContext failed to stop", e);
-            throw e;
-        }
-
-        LOG.info("CamelContext stopped");
-    }
-
-    public ProducerTemplate createProducerTemplate() {
-        return camelMain.getProducerTemplate();
-    }
-
-    public Endpoint getEndpoint(String uri) {
-        return camelMain.getCamelContext().getEndpoint(uri);
-    }
-
-    public Collection<Endpoint> getEndpoints() {
-        return camelMain.getCamelContext().getEndpoints();
-    }
-
-    public ConsumerTemplate createConsumerTemplate() {
-        return camelMain.getConsumerTemplate();
-    }
-
-    private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
-        DataFormat df = 
camelMain.getCamelContext().resolveDataFormat(dataformatName);
-
-        if (df == null) {
-            df = camelMain.getCamelContext().createDataFormat(dataformatName);
-
-            final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + 
dataformatName + ".";
-            final Properties props = 
camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> 
k.startsWith(prefix));
-
-            CamelContextAware.trySetCamelContext(df, 
camelMain.getCamelContext());
-
-            if (!props.isEmpty()) {
-                PropertyBindingSupport.build()
-                        .withCamelContext(camelMain.getCamelContext())
-                        .withOptionPrefix(prefix)
-                        .withRemoveParameters(false)
-                        .withProperties((Map) props)
-                        .withTarget(df)
-                        .bind();
-            }
-        }
-
-        //TODO: move it to the caller?
-        if (df == null) {
-            throw new UnsupportedOperationException("No DataFormat found with 
name " + dataformatName);
-        }
-        return df;
-    }
-
-}
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index a050943..3136e98 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -58,7 +58,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -67,7 +67,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
@@ -84,7 +84,7 @@ public class CamelSinkTaskTest {
         records.add(record1);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -128,7 +128,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -172,7 +172,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -222,7 +222,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -281,7 +281,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -345,7 +345,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -407,7 +407,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -466,7 +466,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -509,7 +509,7 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -533,11 +533,11 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
-        assertEquals(1, sinkTask.getCms().getEndpoints()
+        assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
 
         sinkTask.stop();
@@ -560,12 +560,12 @@ public class CamelSinkTaskTest {
         records.add(record);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel", exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
 
-        assertEquals(1, sinkTask.getCms().getEndpoints()
+        assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints()
             .stream().filter(e -> 
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
 
         sinkTask.stop();
@@ -590,7 +590,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
@@ -616,7 +616,7 @@ public class CamelSinkTaskTest {
         records.add(record5);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel camel1 camel2 camel3 camel4", 
exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -625,7 +625,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -652,7 +652,7 @@ public class CamelSinkTaskTest {
         records.add(record5);
         sinkTask.put(records);
 
-        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
         Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel camel1 camel2 camel3 camel4", 
exchange.getMessage().getBody());
         assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 8fd8cb4..4d268de 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -39,7 +39,7 @@ public class CamelSourceTaskTest {
     private static final String TOPIC_NAME = "my-topic";
 
     private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) {
-        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = 
sourceTask.getCms().getProducerTemplate();
         for (int i = 0; i < size; i++) {
             template.sendBody(DIRECT_URI, "test" + i);
         }
@@ -113,7 +113,7 @@ public class CamelSourceTaskTest {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
-        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = 
sourceTask.getCms().getProducerTemplate();
 
         // key in the message with body
         template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 
1234);
@@ -150,7 +150,7 @@ public class CamelSourceTaskTest {
 
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
-        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = 
sourceTask.getCms().getProducerTemplate();
 
         // send String
         template.sendBody(DIRECT_URI, "test");
@@ -206,9 +206,9 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getEndpoints().size());
+        assertEquals(2, 
sourceTask.getCms().getCamelContext().getEndpoints().size());
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("foo"));
@@ -231,9 +231,9 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        assertEquals(2, sourceTask.getCms().getEndpoints().size());
+        assertEquals(2, 
sourceTask.getCms().getCamelContext().getEndpoints().size());
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("direct"))
                 .forEach(e -> {
                     assertTrue(e.getEndpointUri().contains("end"));
@@ -259,7 +259,7 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-        sourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getCamelContext().getEndpoints().stream()
             .filter(e -> e.getEndpointUri().startsWith("timer"))
             .forEach(e -> {
                 assertTrue(e.getEndpointUri().contains("foo"));
@@ -281,7 +281,7 @@ public class CamelSourceTaskTest {
         sourceTask.start(props);
 
 
-        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+        final ProducerTemplate template = 
sourceTask.getCms().getProducerTemplate();
         template.sendBodyAndHeader(DIRECT_URI, "test", "bigdecimal", new 
BigDecimal(1234567890));
 
         List<SourceRecord> results = sourceTask.poll();
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 679d554..0ce5ab0 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -16,16 +16,12 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.component.hl7.HL7DataFormat;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectDataformat;
-import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
+import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
 import org.apache.camel.model.dataformat.SyslogDataFormat;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
@@ -82,12 +78,13 @@ public class DataFormatTest {
         props.put("topics", "mytopic");
         props.put("camel.source.marshal", "hl7");
         props.put("camel.source.unmarshal", "syslog");
-
-        List<CamelKafkaConnectDataformat> dataformats = new LinkedList<>();
-        dataformats.add(new CamelKafkaConnectDataformat("hl7", 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
-        dataformats.add(new CamelKafkaConnectDataformat("syslog", 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.UNMARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", dataformats, 10, 500, dcc);
+
+        CamelKafkaConnectMain cms = 
CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withUnmarshallDataFormat("syslog")
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
 
         HL7DataFormat hl7Df = new HL7DataFormat();
         hl7Df.setValidate(false);
@@ -111,9 +108,11 @@ public class DataFormatTest {
         props.put("topics", "mytopic");
         props.put("camel.source.marshal", "hl7");
 
-        List<CamelKafkaConnectDataformat> dataformats = 
Collections.singletonList(new CamelKafkaConnectDataformat("hl7", 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", dataformats, 10, 500, dcc);
+        CamelKafkaConnectMain cms = 
CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
 
         HL7DataFormat hl7df = new HL7DataFormat();
         hl7df.setValidate(false);
@@ -133,9 +132,13 @@ public class DataFormatTest {
         props.put("camel.source.marshal", "hl7");
         props.put("camel.dataformat.hl7.validate", "false");
 
-        List<CamelKafkaConnectDataformat> dataformats = 
Collections.singletonList(new CamelKafkaConnectDataformat("hl7", 
CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL));
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", dataformats, 10, 500, dcc);
+
+        CamelKafkaConnectMain cms = 
CamelKafkaConnectMain.builder("direct://start", "log://test")
+            .withProperties(props)
+            .withMarshallDataFormat("hl7")
+            .build(dcc);
+
 
         cms.start();
         HL7DataFormat hl7dfLoaded = 
dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
index 5b032fe..6e3240c 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/PropertiesNameFormatsTest.java
@@ -39,7 +39,7 @@ public class PropertiesNameFormatsTest {
 
         CamelSourceTask camelsourceTask = new CamelSourceTask();
         camelsourceTask.start(props);
-        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) 
camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
+        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) 
camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
         
assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", 
sedaTestQueue.getClass().getName());
         assertEquals(1, 
((TestBlockingQueueFactory)sedaTestQueue).getCounter());
         camelsourceTask.stop();
@@ -55,7 +55,7 @@ public class PropertiesNameFormatsTest {
 
         CamelSourceTask camelsourceTask = new CamelSourceTask();
         camelsourceTask.start(props);
-        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) 
camelsourceTask.getCms().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
+        BlockingQueueFactory<Exchange> sedaTestQueue = ((SedaComponent) 
camelsourceTask.getCms().getCamelContext().getEndpoint("seda://test").getCamelContext().getComponent("seda")).getDefaultQueueFactory();
         
assertEquals("org.apache.camel.kafkaconnector.test.TestBlockingQueueFactory", 
sedaTestQueue.getClass().getName());
         assertEquals(1, 
((TestBlockingQueueFactory)sedaTestQueue).getCounter());
         camelsourceTask.stop();

Reply via email to