This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 49aa5e8 Improved CamelSourceTaskTest
new 5593ea7 Merge pull request #205 from fvaleri/no-flaky
49aa5e8 is described below
commit 49aa5e8cba11a49c9fa57f4dc1a3639e9a2c0e29
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sat May 9 21:06:28 2020 +0200
Improved CamelSourceTaskTest
---
.../camel/kafkaconnector/CamelSourceTaskTest.java | 365 ++++++++++-----------
1 file changed, 165 insertions(+), 200 deletions(-)
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 9c40d40..33807ee 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.kafkaconnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.connect.data.Schema;
@@ -32,270 +31,236 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSourceTaskTest {
+ private static final String DIRECT_URI = "direct:start";
+ private static final String TOPIC_NAME = "my-topic";
+
+ private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) {
+ final ProducerTemplate template =
sourceTask.getCms().createProducerTemplate();
+ for (int i = 0; i < size; i++) {
+ template.sendBody(DIRECT_URI, "test" + i);
+ }
+ }
+
@Test
public void testSourcePolling() {
+ final long size = 2;
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "direct:start");
- props.put("topics", "mytopic");
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
DIRECT_URI);
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
- template.sendBody("direct:start", "awesome!");
+ sendBatchOfRecords(sourceTask, size);
+ List<SourceRecord> poll = sourceTask.poll();
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertEquals("mytopic", poll.get(0).topic());
+ assertEquals(size, poll.size());
+ assertEquals(TOPIC_NAME, poll.get(0).topic());
- camelSourceTask.stop();
+ sourceTask.stop();
}
@Test
- public void testSourcePollingWithKey() {
+ public void testSourcePollingMaxBatchPollSize() {
+ final long size = 2;
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "direct:start");
- props.put("topics", "mytopic");
-
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF,
"CamelSpecialTestKey");
-
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
-
- final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
-
- // first we test if we have a key in the message with body
- template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", 1234);
-
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertEquals(1234, poll.get(0).key());
- assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
-
- // second we test if we have no key under the header
- template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader",
1234);
-
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
-
- // third we test if we have the header but with null value
- template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", null);
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
DIRECT_URI);
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF,
String.valueOf(size));
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- camelSourceTask.stop();
- }
+ sendBatchOfRecords(sourceTask, size + 1);
+ List<SourceRecord> poll = sourceTask.poll();
+ int pollSize = poll.size();
- @Test
- public void testSourcePollingWithBody() {
- Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "direct:start");
- props.put("topics", "mytopic");
-
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
-
- final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
-
- // send first data
- template.sendBody("direct:start", "testing kafka connect");
-
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertEquals("testing kafka connect", poll.get(0).value());
- assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
-
- // send second data
- template.sendBody("direct:start", true);
-
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertTrue((boolean)poll.get(0).value());
- assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
-
- // second third data
- template.sendBody("direct:start", 1234L);
-
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertEquals(1, poll.size());
- assertEquals(1234L, poll.get(0).value());
- assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
-
- // third with null data
- template.sendBody("direct:start", null);
-
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertNull(poll.get(0).key());
- assertNull(poll.get(0).keySchema());
- assertNull(poll.get(0).value());
- assertNull(poll.get(0).valueSchema());
-
- camelSourceTask.stop();
+ assertTrue(pollSize >= 0 && pollSize <= size, "Batch size: " +
pollSize + ", expected between 0 and " + size);
+ sourceTask.stop();
}
@Test
public void testSourcePollingTimeout() {
- final int nuberOfMessagesSent = 999;
+ final long size = 999;
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "direct:start");
- props.put("topics", "mytopic");
- props.put("camel.source.maxPollDuration", "1");
-
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
-
- final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
DIRECT_URI);
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "2");
- // first we send nuberOfMessagesSent of messages
- Stream.of(nuberOfMessagesSent).forEach(i ->
template.sendBody("direct:start", "awesome!"));
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- // then we assert we received only a fraction of them (proving that
polling timeout of 1 Millisecond is working)
- List<SourceRecord> poll = camelSourceTask.poll();
- assertTrue(poll.size() < nuberOfMessagesSent, "Expected received
messages count to be strictly less than " + nuberOfMessagesSent + ", got " +
poll.size());
+ sendBatchOfRecords(sourceTask, size);
+ List<SourceRecord> poll = sourceTask.poll();
+ int pollSize = poll.size();
- camelSourceTask.stop();
+ assertTrue(pollSize < size, "Batch size: " + pollSize + ", expected
strictly less than " + size);
+ sourceTask.stop();
}
@Test
- public void testSourcePollingMaxRecordNumber() {
- final int nuberOfMessagesSent = 2;
+ public void testSourcePollingWithKey() {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "direct:start");
- props.put("topics", "mytopic");
- props.put("camel.source.maxBatchPollSize", "1");
-
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
DIRECT_URI);
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF,
"CamelSpecialTestKey");
- final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
+ final ProducerTemplate template =
sourceTask.getCms().createProducerTemplate();
- // first we send nuberOfMessagesSent of messages >
camel.source.maxBatchPollSize
- Stream.of(nuberOfMessagesSent).forEach(i ->
template.sendBody("direct:start", "awesome!"));
+ // key in the message with body
+ template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey",
1234);
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
+ List<SourceRecord> poll1 = sourceTask.poll();
+ assertEquals(1, poll1.size());
+ assertEquals(1234, poll1.get(0).key());
+ assertEquals(Schema.Type.INT32, poll1.get(0).keySchema().type());
- // then we assert we received just camel.source.maxBatchPollSize
- assertEquals(1, poll.size());
- camelSourceTask.stop();
- }
+ // no key under the header
+ template.sendBodyAndHeader(DIRECT_URI, "test", "WrongHeader", 1234);
- @Test
- public void testSourceConsumerOptions() {
- Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
- props.put("topics", "mytopic");
- props.put("camel.source.pollingConsumerQueueSize", "10");
- props.put("camel.source.pollingConsumerBlockTimeout", "1000");
- props.put("camel.source.pollingConsumerBlockWhenFull", "false");
+ List<SourceRecord> poll2 = sourceTask.poll();
+ assertEquals(1, poll2.size());
+ assertNull(poll2.get(0).key());
+ assertNull(poll2.get(0).keySchema());
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ // header with null value
+ template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey",
null);
- assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+ List<SourceRecord> poll3 = sourceTask.poll();
+ assertEquals(1, poll3.size());
+ assertNull(poll3.get(0).key());
+ assertNull(poll3.get(0).keySchema());
- camelSourceTask.getCms().getEndpoints().stream()
- .filter(e -> e.getEndpointUri().startsWith("direct"))
- .forEach(e -> {
- assertTrue(e.getEndpointUri().contains("end"));
-
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000"));
-
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
-
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
- });
+ sourceTask.stop();
+ }
- camelSourceTask.stop();
+ @Test
+ public void testSourcePollingWithBody() {
+ Map<String, String> props = new HashMap<>();
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
DIRECT_URI);
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
+ final ProducerTemplate template =
sourceTask.getCms().createProducerTemplate();
+
+ // send String
+ template.sendBody(DIRECT_URI, "test");
+
+ List<SourceRecord> poll1 = sourceTask.poll();
+ assertEquals(1, poll1.size());
+ assertEquals("test", poll1.get(0).value());
+ assertEquals(Schema.Type.STRING, poll1.get(0).valueSchema().type());
+ assertNull(poll1.get(0).key());
+ assertNull(poll1.get(0).keySchema());
+
+ // send boolean
+ template.sendBody(DIRECT_URI, true);
+
+ List<SourceRecord> poll2 = sourceTask.poll();
+ assertEquals(1, poll2.size());
+ assertTrue((boolean)poll2.get(0).value());
+ assertEquals(Schema.Type.BOOLEAN, poll2.get(0).valueSchema().type());
+ assertNull(poll2.get(0).key());
+ assertNull(poll2.get(0).keySchema());
+
+ // send long
+ template.sendBody(DIRECT_URI, 1234L);
+
+ List<SourceRecord> poll3 = sourceTask.poll();
+ assertEquals(1, poll3.size());
+ assertEquals(1234L, poll3.get(0).value());
+ assertEquals(Schema.Type.INT64, poll3.get(0).valueSchema().type());
+ assertNull(poll3.get(0).key());
+ assertNull(poll3.get(0).keySchema());
+
+ // send null
+ template.sendBody(DIRECT_URI, null);
+
+ List<SourceRecord> poll4 = sourceTask.poll();
+ assertNull(poll4.get(0).key());
+ assertNull(poll4.get(0).keySchema());
+ assertNull(poll4.get(0).value());
+ assertNull(poll4.get(0).valueSchema());
+
+ sourceTask.stop();
}
@Test
- public void testSourceUrlPrecedenceOnComponentProperty() {
+ public void testUrlPrecedenceOnComponentProperty() {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url",
"timer:kafkaconnector?period=10&fixedRate=true&delay=0");
- props.put("topics", "mytopic");
- //these properties should be ignored
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
"timer:foo?period=10&repeatCount=2");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"shouldNotBeUsed");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"delay", "100000000");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name",
"shouldNotBeUsed");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "shouldNotBeUsed");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"repeatCount", "999");
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+ assertEquals(2, sourceTask.getCms().getEndpoints().size());
- camelSourceTask.getCms().getEndpoints().stream()
+ sourceTask.getCms().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("timer"))
.forEach(e -> {
- assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+ assertTrue(e.getEndpointUri().contains("foo"));
assertTrue(e.getEndpointUri().contains("period=10"));
- assertTrue(e.getEndpointUri().contains("fixedRate=true"));
- assertTrue(e.getEndpointUri().contains("delay=0"));
+ assertTrue(e.getEndpointUri().contains("repeatCount=2"));
});
- camelSourceTask.stop();
+ sourceTask.stop();
}
@Test
- public void testSourceUsingComponentProperty() {
+ public void testSourcePollingConsumerOptions() {
Map<String, String> props = new HashMap<>();
- props.put("topics", "mytopic");
- props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "10000");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"delay", "0");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "kafkaconnector");
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF,
"timer:foo?period=10&repeatCount=2");
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF,
"10");
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF,
"10");
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF,
"false");
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- camelSourceTask.getCms().getEndpoints().stream()
- .filter(e -> e.getEndpointUri().startsWith("timer"))
- .forEach(e -> {
- assertTrue(e.getEndpointUri().contains("kafkaconnector"));
- assertTrue(e.getEndpointUri().contains("period=1000"));
- assertTrue(e.getEndpointUri().contains("delay=0"));
- });
+ assertEquals(2, sourceTask.getCms().getEndpoints().size());
- camelSourceTask.stop();
+ sourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().startsWith("direct"))
+ .forEach(e -> {
+ assertTrue(e.getEndpointUri().contains("end"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=10"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
+ });
+
+ sourceTask.stop();
}
@Test
- public void testSourceUsingMultipleComponentProperties() {
+ public void testSourceUsingComponentProperties() {
Map<String, String> props = new HashMap<>();
- props.put("topics", "mytopic");
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "1000");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"repeatCount", "0");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "kafkaconnector");
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF,
"2");
+
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "10");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "foo");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "10");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"repeatCount", "2");
- CamelSourceTask camelSourceTask = new CamelSourceTask();
- camelSourceTask.start(props);
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
- camelSourceTask.getCms().getEndpoints().stream()
+ sourceTask.getCms().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("timer"))
.forEach(e -> {
- assertTrue(e.getEndpointUri().contains("kafkaconnector"));
- assertTrue(e.getEndpointUri().contains("period=1000"));
- assertTrue(e.getEndpointUri().contains("repeatCount=0"));
+ assertTrue(e.getEndpointUri().contains("foo"));
+ assertTrue(e.getEndpointUri().contains("period=10"));
+ assertTrue(e.getEndpointUri().contains("repeatCount=2"));
});
- camelSourceTask.stop();
- }
-
- private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask
camelSourceTask, int retries) {
- List<SourceRecord> poll;
- do {
- poll = camelSourceTask.poll();
- if (poll == null) {
- retries--;
- }
- } while (poll == null && retries > 0);
- return poll;
+ sourceTask.stop();
}
}