This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7fd48c1 source: better handling of headers of type Data #544 7fd48c1 is described below commit 7fd48c150172fa9419fe33dde560254bb35845cc Author: Luca Burgazzoli <lburgazz...@gmail.com> AuthorDate: Thu Oct 8 16:51:06 2020 +0200 source: better handling of headers of type Data #544 --- .../camel/kafkaconnector/CamelConnectorUtils.java | 49 ++++++++++++++++++++++ .../apache/camel/kafkaconnector/CamelSinkTask.java | 13 +++++- .../camel/kafkaconnector/CamelSourceTask.java | 9 +--- .../camel/kafkaconnector/CamelSinkTaskTest.java | 32 ++++++++++++++ .../camel/kafkaconnector/CamelSourceTaskTest.java | 30 +++++++++++++ 5 files changed, 123 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java new file mode 100644 index 0000000..fa6a3e5 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector; + +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +public final class CamelConnectorUtils { + public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + private CamelConnectorUtils() { + } + + public static Date truncateDate(Date value) { + Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC); + calendar.setTime(value); + calendar.set(Calendar.YEAR, 0); + calendar.set(Calendar.MONTH, 0); + calendar.set(Calendar.DAY_OF_MONTH, 0); + + return calendar.getTime(); + } + + public static Date truncateTime(Date value) { + Calendar calendar = Calendar.getInstance(CamelConnectorUtils.UTC); + calendar.setTime(value); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + return calendar.getTime(); + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 51cfe88..63e139c 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -18,9 +18,11 @@ package org.apache.camel.kafkaconnector; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -35,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.sink.SinkRecord; @@ -172,7 +175,10 @@ public class CamelSinkTask extends SinkTask { private void addHeader(Map<String, Object> map, Header singleHeader) { String camelHeaderKey = StringUtils.removeStart(singleHeader.key(), HEADER_CAMEL_PREFIX); Schema schema = singleHeader.schema(); - if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { + + if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) { + map.put(camelHeaderKey, (Date)singleHeader.value()); + } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { map.put(camelHeaderKey, (String)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { map.put(camelHeaderKey, (Boolean)singleHeader.value()); @@ -204,7 +210,10 @@ public class CamelSinkTask extends SinkTask { private void addProperty(Exchange exchange, Header singleHeader) { String camelPropertyKey = StringUtils.removeStart(singleHeader.key(), PROPERTY_CAMEL_PREFIX); Schema schema = singleHeader.schema(); - if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { + + if (schema.type().equals(Timestamp.SCHEMA.type()) && Objects.equals(schema.name(), Timestamp.SCHEMA.name())) { + exchange.getProperties().put(camelPropertyKey, (Date)singleHeader.value()); + } else if (schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) { exchange.getProperties().put(camelPropertyKey, (String)singleHeader.value()); } else if (schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName())) { exchange.getProperties().put(camelPropertyKey, (Boolean)singleHeader.value()); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 724ddd7..c793f1b 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -17,9 +17,6 @@ package org.apache.camel.kafkaconnector; import java.math.BigDecimal; -import java.sql.Time; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -237,12 +234,8 @@ public class CamelSourceTask extends SourceTask { } record.headers().addBytes(keyCamelHeader, bytes); - } else if (value instanceof Time) { - record.headers().addTime(keyCamelHeader, (Time)value); - } else if (value instanceof Timestamp) { - record.headers().addTimestamp(keyCamelHeader, (Timestamp)value); } else if (value instanceof Date) { - record.headers().addString(keyCamelHeader, new SimpleDateFormat("yyyy-MM-dd").format(value)); + record.headers().addTimestamp(keyCamelHeader, (Date)value); } else if (value instanceof BigDecimal) { Schema schema = Decimal.schema(((BigDecimal)value).scale()); record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 6e28b0e..4a8fc9e 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -691,4 +693,34 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + @Test + public void testBodyAndDateHeader() { + final Date now = new Date(); + + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + try { + List<SinkRecord> records = new ArrayList<>(); + + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now); + records.add(record); + + sinkTask.put(records); + + Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT); + + assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> { + assertThat(value).isEqualTo(now); + }); + } finally { + sinkTask.stop(); + } + } + } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 25088cf..ef393f9 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -329,6 +330,35 @@ public class CamelSourceTaskTest { } @Test + public void testSourceDateHeader() { + final String key = "_key"; + final Date now = new Date(); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(mapOf( + CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, + CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "direct", + CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "start" + )); + + sourceTask.getCms().getProducerTemplate().sendBodyAndHeader(DIRECT_URI, "test", key, now); + + try { + List<SourceRecord> results = sourceTask.poll(); + assertThat(results).hasSize(1); + + Header header = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + key).next(); + + assertThat(header.schema().type()).isEqualTo(Schema.Type.INT64); + assertThat(header.value()).isInstanceOfSatisfying(Date.class, value -> { + assertThat(value).isEqualTo(now); + }); + } finally { + sourceTask.stop(); + } + } + + @Test public void testSourcePollingWithAggregationBySize() { final int size = 10; final int chunkSize = 5;