This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6049ee4cbdbdde6d4c7912371eb818a704acb8f Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Oct 7 17:10:10 2020 +0200 [core] cleanup CamelSourceTask --- .../org/apache/camel/kafkaconnector/CamelSourceTask.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 42d9214..604b9b1 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -22,7 +22,6 @@ import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; @@ -58,15 +57,14 @@ public class CamelSourceTask extends SourceTask { private CamelKafkaConnectMain cms; private CamelSourceConnectorConfig config; private PollingConsumer consumer; - private String topic; - private List<String> topics; + private String[] topics; private Long maxBatchPollSize; private Long maxPollDuration; private String camelMessageHeaderKey; @Override public String version() { - return new CamelSourceConnector().version(); + return VersionUtil.getVersion(); } @Override @@ -85,8 +83,7 @@ public class CamelSourceTask extends SourceTask { final String unmarshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_UNMARSHAL_CONF); final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF); - topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF); - topics = Arrays.asList(topic.split(",")); + topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(","); String localUrl = getLocalUrlWithPollingOptions(config); @@ -205,7 +202,7 @@ public class CamelSourceTask extends SourceTask { } protected Map<String, String> getDefaultConfig() { - return Collections.EMPTY_MAP; + return Collections.emptyMap(); } protected static String getCamelSourceEndpointConfigPrefix() { @@ -236,9 +233,7 @@ public class CamelSourceTask extends SourceTask { } else if (value instanceof Timestamp) { record.headers().addTimestamp(keyCamelHeader, (Timestamp)value); } else if (value instanceof Date) { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - String convertedDate = sdf.format(value); - record.headers().addString(keyCamelHeader, (String)convertedDate); + record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value)); } else if (value instanceof BigDecimal) { Schema schema = Decimal.schema(((BigDecimal)value).scale()); record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema);