This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push:
new b46cc336 CAMEL-18127: implement adapter auto-configuration in the
resume API
b46cc336 is described below
commit b46cc33621758829619e0efce88efff4c0244c25
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon May 23 18:10:04 2022 +0200
CAMEL-18127: implement adapter auto-configuration in the resume API
---
.../clients/kafka/ConsumerPropertyFactory.java | 35 ---------
.../kafka/DefaultConsumerPropertyFactory.java | 82 ----------------------
.../kafka/DefaultProducerPropertyFactory.java | 60 ----------------
.../resume/clients/kafka/FileDeserializer.java | 35 ---------
.../resume/clients/kafka/FileSerializer.java | 34 ---------
.../clients/kafka/ProducerPropertyFactory.java | 35 ---------
.../kafka/file/LargeFileRouteBuilder.java | 22 ++++--
.../kafka/fileset/LargeDirectoryRouteBuilder.java | 9 ++-
.../example/resume/file/offset/main/MainApp.java | 43 ++++--------
.../clusterized/main/ClusterizedListener.java | 35 +++------
.../resume-api-fileset/src/main/docker/Dockerfile | 5 +-
.../camel/example/resume/fileset/main/MainApp.java | 37 +++-------
.../resume-api-fileset/src/main/scripts/run.sh | 16 ++++-
13 files changed, 72 insertions(+), 376 deletions(-)
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
deleted file mode 100644
index 7998b82c..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-
-/**
- * An interface to produce properties that can be used to configure a Kafka
consumer. The
- * CLI runtime equivalent for this file is the consumer.properties file from
the Kafka
- * provided along with the Kafka deliverable
- *
- */
-public interface ConsumerPropertyFactory {
-
- /**
- * Gets the properties used to configure the consumer
- * @return a Properties object containing the set of properties for the
consumer
- */
- Properties getProperties();
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
deleted file mode 100644
index 6cd99aea..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-
-/**
- * A property producer that can be used to create a Kafka consumer with a
minimum
- * set of configurations that can consume from a Kafka topic.
- * <p>
- * The consumer behavior from using this set of properties causes the consumer
to
- * consumes all published messages "from-beginning".
- */
-public class DefaultConsumerPropertyFactory implements ConsumerPropertyFactory
{
- private final String bootstrapServer;
- private String valueDeserializer = StringDeserializer.class.getName();
- private String keyDeserializer = StringDeserializer.class.getName();
- private String offsetReset = "earliest";
- private String groupId = UUID.randomUUID().toString();
-
- /**
- * Constructs the properties using the given bootstrap server
- *
- * @param bootstrapServer the address of the server in the format
- * PLAINTEXT://${address}:${port}
- */
- public DefaultConsumerPropertyFactory(String bootstrapServer) {
- this.bootstrapServer = bootstrapServer;
- }
-
- public void setValueDeserializer(String valueDeserializer) {
- this.valueDeserializer = valueDeserializer;
- }
-
- public void setKeyDeserializer(String keyDeserializer) {
- this.keyDeserializer = keyDeserializer;
- }
-
- public void setOffsetReset(String offsetReset) {
- this.offsetReset = offsetReset;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- @Override
- public Properties getProperties() {
- Properties props = new Properties();
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer);
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer);
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
offsetReset);
- return props;
- }
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
deleted file mode 100644
index 6aa3ad30..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-public class DefaultProducerPropertyFactory implements ProducerPropertyFactory
{
- private final String bootstrapServer;
- private String valueSerializer = StringDeserializer.class.getName();
- private String keySerializer = StringDeserializer.class.getName();
-
- /**
- * Constructs the properties using the given bootstrap server
- * @param bootstrapServer the address of the server in the format
- * PLAINTEXT://${address}:${port}
- */
- public DefaultProducerPropertyFactory(String bootstrapServer) {
- this.bootstrapServer = bootstrapServer;
- }
-
- public void setValueSerializer(String valueSerializer) {
- this.valueSerializer = valueSerializer;
- }
-
- public void setKeySerializer(String keySerializer) {
- this.keySerializer = keySerializer;
- }
-
- @Override
- public Properties getProperties() {
- Properties props = new Properties();
-
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
- props.put(ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString());
-
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializer);
-
- return props;
- }
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
deleted file mode 100644
index f7caa695..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.io.File;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileDeserializer implements Deserializer<File> {
- private static final Logger LOG =
LoggerFactory.getLogger(FileDeserializer.class);
-
- @Override
- public File deserialize(String s, byte[] bytes) {
- String name = new String(bytes);
- LOG.trace("Deserializing {} from topic {}", name, s);
- return new File(name);
- }
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
deleted file mode 100644
index a991ff15..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.io.File;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileSerializer implements Serializer<File> {
- private static final Logger LOG =
LoggerFactory.getLogger(FileSerializer.class);
-
- @Override
- public byte[] serialize(String s, File file) {
- LOG.trace("Serializing file: {}", file.getPath());
- return file.getPath().getBytes();
- }
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
deleted file mode 100644
index 45ffda1e..00000000
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-
-/**
- * An interface to produce properties that can be used to configure a Kafka
consumer. The
- * CLI runtime equivalent for this file is the consumer.properties file from
the Kafka
- * provided along with the Kafka deliverable
- *
- */
-public interface ProducerPropertyFactory {
-
- /**
- * Gets the properties used to configure the consumer
- * @return a Properties object containing the set of properties for the
consumer
- */
- Properties getProperties();
-}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 9f162096..e304620e 100644
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -24,9 +24,10 @@ import java.util.concurrent.CountDownLatch;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.FileOffset;
import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
@@ -39,12 +40,15 @@ public class LargeFileRouteBuilder extends RouteBuilder {
private static final Logger LOG =
LoggerFactory.getLogger(LargeFileRouteBuilder.class);
private KafkaResumeStrategy testResumeStrategy;
+ private final ResumeCache<File> cache;
+
private long lastOffset;
private int batchSize;
private final CountDownLatch latch;
- public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy,
CountDownLatch latch) {
+ public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy,
ResumeCache<File> cache, CountDownLatch latch) {
this.testResumeStrategy = resumeStrategy;
+ this.cache = cache;
String tmp = System.getProperty("resume.batch.size", "30");
this.batchSize = Integer.valueOf(tmp);
@@ -58,8 +62,14 @@ public class LargeFileRouteBuilder extends RouteBuilder {
File path = exchange.getMessage().getHeader("CamelFilePath",
File.class);
LOG.debug("Path: {} ", path);
- final GenericFileResumeAdapter adapter =
testResumeStrategy.getAdapter(GenericFileResumeAdapter.class);
- lastOffset = adapter.getLastOffset(path).orElse(0L);
+ FileOffset offsetContainer = cache.get(path, FileOffset.class);
+
+ if (offsetContainer != null) {
+ lastOffset = offsetContainer.offset();
+ } else {
+ lastOffset = 0;
+ }
+
LOG.debug("Starting to read at offset {}", lastOffset);
String line = br.readLine();
@@ -68,7 +78,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
if (line == null || line.isEmpty()) {
LOG.debug("End of file");
// EOF, therefore reset the offset
- final Resumable<File, Long> resumable = Resumables.of(path,
0L);
+ final Resumable resumable = Resumables.of(path, 0L);
exchange.getMessage().setHeader(Exchange.OFFSET, resumable);
break;
@@ -87,6 +97,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
}
if (count == batchSize) {
+ LOG.info("Reached the last offset in the batch. Stopping ...");
exchange.setRouteStop(true);
latch.countDown();
}
@@ -97,6 +108,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
*/
public void configure() {
getCamelContext().getRegistry().bind("testResumeStrategy",
testResumeStrategy);
+ getCamelContext().getRegistry().bind("resumeCache", cache);
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
.resumable("testResumeStrategy")
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 18d7d9f9..9c060e6c 100644
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -22,16 +22,20 @@ import java.io.File;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LargeDirectoryRouteBuilder extends RouteBuilder {
private static final Logger LOG =
LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
- private KafkaResumeStrategy<File, File> testResumeStrategy;
+ private final KafkaResumeStrategy<Resumable> testResumeStrategy;
+ private final ResumeCache<File> cache;
- public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy) {
+ public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy,
ResumeCache<File> cache) {
this.testResumeStrategy = resumeStrategy;
+ this.cache = cache;
}
private void process(Exchange exchange) throws Exception {
@@ -48,6 +52,7 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder {
*/
public void configure() {
getCamelContext().getRegistry().bind("testResumeStrategy",
testResumeStrategy);
+ getCamelContext().getRegistry().bind("resumeCache", cache);
from("file:{{input.dir}}?noop=true&recursive=true")
.resumable("testResumeStrategy")
diff --git
a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index eedbd164..9b01173d 100644
---
a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++
b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -17,23 +17,17 @@
package org.apache.camel.example.resume.file.offset.main;
-import java.io.File;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.single.CaffeineCache;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
-import
org.apache.camel.component.file.consumer.adapters.DefaultGenericFileResumeAdapter;
-import
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
import
org.apache.camel.example.resume.strategies.kafka.file.LargeFileRouteBuilder;
import org.apache.camel.main.Main;
import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
/**
* A Camel Application
@@ -47,9 +41,9 @@ public class MainApp {
Main main = new Main();
CountDownLatch latch = new CountDownLatch(1);
- SingleNodeKafkaResumeStrategy<File, File> resumeStrategy =
getUpdatableConsumerResumeStrategy();
+ SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy =
getUpdatableConsumerResumeStrategy();
- RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy,
latch);
+ RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy,
new CaffeineCache<>(100), latch);
main.configure().addRoutesBuilder(routeBuilder);
Executors.newSingleThreadExecutor().submit(() -> waitForStop(main,
latch));
@@ -57,38 +51,27 @@ public class MainApp {
main.run(args);
}
- private static SingleNodeKafkaResumeStrategy<File, File>
getUpdatableConsumerResumeStrategy() {
+ private static SingleNodeKafkaResumeStrategy<Resumable>
getUpdatableConsumerResumeStrategy() {
String bootStrapAddress = System.getProperty("bootstrap.address",
"localhost:9092");
String kafkaTopic = System.getProperty("resume.type.kafka.topic",
"offsets");
- final DefaultConsumerPropertyFactory consumerPropertyFactory = new
DefaultConsumerPropertyFactory(bootStrapAddress);
-
-
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-
consumerPropertyFactory.setValueDeserializer(LongDeserializer.class.getName());
+ final Properties consumerProperties =
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
// In this case, we want to consume only the most recent offset
- consumerPropertyFactory.setOffsetReset("latest");
-
- final DefaultProducerPropertyFactory producerPropertyFactory = new
DefaultProducerPropertyFactory(bootStrapAddress);
-
-
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-
producerPropertyFactory.setValueSerializer(LongSerializer.class.getName());
-
- CaffeineCache<File,Long> cache = new CaffeineCache<>(1);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");
- GenericFileResumeAdapter adapter = new
DefaultGenericFileResumeAdapter(cache);
+ final Properties producerProperties =
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
- return new SingleNodeKafkaResumeStrategy(kafkaTopic, cache, adapter,
producerPropertyFactory.getProperties(),
- consumerPropertyFactory.getProperties());
+ return new SingleNodeKafkaResumeStrategy(kafkaTopic,
producerProperties, consumerProperties);
}
private static void waitForStop(Main main, CountDownLatch latch) {
try {
latch.await();
+ System.exit(0);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ System.exit(1);
}
- main.stop();
}
diff --git
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
index 5686f89e..e0353eee 100644
---
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
+++
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
@@ -16,22 +16,18 @@
*/
package org.apache.camel.example.resume.fileset.clusterized.main;
-import java.io.File;
+import java.util.Properties;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.multi.CaffeineCache;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import
org.apache.camel.component.file.consumer.adapters.DefaultFileSetResumeAdapter;
import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService;
-import
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
import
org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.MainListener;
import org.apache.camel.processor.resume.kafka.MultiNodeKafkaResumeStrategy;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +56,7 @@ class ClusterizedListener implements MainListener {
main.getCamelContext().addService(clusterService);
LOG.trace("Creating the strategy");
- MultiNodeKafkaResumeStrategy<File,File> resumeStrategy =
getUpdatableConsumerResumeStrategyForSet();
+ MultiNodeKafkaResumeStrategy<Resumable> resumeStrategy =
getUpdatableConsumerResumeStrategyForSet();
main.getCamelContext().getRegistry().bind("testResumeStrategy",
resumeStrategy);
LOG.trace("Creating the route");
@@ -98,25 +94,14 @@ class ClusterizedListener implements MainListener {
System.exit(0);
}
- private static MultiNodeKafkaResumeStrategy<File,File>
getUpdatableConsumerResumeStrategyForSet() {
+ private static MultiNodeKafkaResumeStrategy<Resumable>
getUpdatableConsumerResumeStrategyForSet() {
String bootStrapAddress = System.getProperty("bootstrap.address",
"localhost:9092");
String kafkaTopic = System.getProperty("resume.type.kafka.topic",
"offsets");
- final DefaultConsumerPropertyFactory consumerPropertyFactory = new
DefaultConsumerPropertyFactory(bootStrapAddress);
+ final Properties consumerProperties =
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
-
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-
consumerPropertyFactory.setValueDeserializer(FileDeserializer.class.getName());
- consumerPropertyFactory.setOffsetReset("earliest");
-
- final DefaultProducerPropertyFactory producerPropertyFactory = new
DefaultProducerPropertyFactory(bootStrapAddress);
-
-
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-
producerPropertyFactory.setValueSerializer(FileSerializer.class.getName());
-
- CaffeineCache<File, File> cache = new CaffeineCache<>(10000);
- FileSetResumeAdapter adapter = new DefaultFileSetResumeAdapter(cache);
-
- return new MultiNodeKafkaResumeStrategy(kafkaTopic, cache, adapter,
producerPropertyFactory.getProperties(),
- consumerPropertyFactory.getProperties());
+ final Properties producerProperties =
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+ return new MultiNodeKafkaResumeStrategy(kafkaTopic,
producerProperties, consumerProperties);
}
}
diff --git a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
index 72c319c8..f054938a 100644
--- a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
+++ b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
@@ -22,9 +22,6 @@ COPY src/main/scripts/run.sh /deployments/run.sh
ENV JAVA_HOME /etc/alternatives/jre
ENV DATA_DIR /data/source
-RUN mkdir -p ${DATA_DIR} && \
- cd ${DATA_DIR} && \
- for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1
100) ; do echo $RANDOM > $file ; done) ; cd ..) ; done && \
- chmod +x /deployments/*.sh
+RUN chmod +x /deployments/*.sh
WORKDIR /deployments/
CMD [ "sh", "-c", "/deployments/run.sh" ]
diff --git
a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index 1c0fc728..3f7e39f2 100644
---
a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++
b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -17,19 +17,15 @@
package org.apache.camel.example.resume.fileset.main;
-import java.io.File;
+import java.util.Properties;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.multi.CaffeineCache;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import
org.apache.camel.component.file.consumer.adapters.DefaultFileSetResumeAdapter;
-import
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
import
org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
import org.apache.camel.main.Main;
import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
/**
* A Camel Application
@@ -42,33 +38,22 @@ public class MainApp {
public static void main(String... args) throws Exception {
Main main = new Main();
- SingleNodeKafkaResumeStrategy<File, File> resumeStrategy =
getUpdatableConsumerResumeStrategyForSet();
- RouteBuilder routeBuilder = new
LargeDirectoryRouteBuilder(resumeStrategy);
+ SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy =
getUpdatableConsumerResumeStrategyForSet();
+ RouteBuilder routeBuilder = new
LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
main.configure().addRoutesBuilder(routeBuilder);
main.run(args);
}
- private static SingleNodeKafkaResumeStrategy<File, File>
getUpdatableConsumerResumeStrategyForSet() {
+ private static SingleNodeKafkaResumeStrategy<Resumable>
getUpdatableConsumerResumeStrategyForSet() {
String bootStrapAddress = System.getProperty("bootstrap.address",
"localhost:9092");
String kafkaTopic = System.getProperty("resume.type.kafka.topic",
"offsets");
- final DefaultConsumerPropertyFactory consumerPropertyFactory = new
DefaultConsumerPropertyFactory(bootStrapAddress);
+ final Properties consumerProperties =
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
-
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-
consumerPropertyFactory.setValueDeserializer(FileDeserializer.class.getName());
- consumerPropertyFactory.setOffsetReset("earliest");
-
- final DefaultProducerPropertyFactory producerPropertyFactory = new
DefaultProducerPropertyFactory(bootStrapAddress);
-
-
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-
producerPropertyFactory.setValueSerializer(FileSerializer.class.getName());
-
- CaffeineCache<File, File> cache = new CaffeineCache<>(10000);
- FileSetResumeAdapter fileSetResumeAdapter = new
DefaultFileSetResumeAdapter(cache);
-
- return new SingleNodeKafkaResumeStrategy<>(kafkaTopic, cache,
fileSetResumeAdapter,
- producerPropertyFactory.getProperties(),
consumerPropertyFactory.getProperties());
+ final Properties producerProperties =
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+ return new SingleNodeKafkaResumeStrategy<>(kafkaTopic,
producerProperties, consumerProperties);
}
}
diff --git a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
index f060b489..fff449c1 100644
--- a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
+++ b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
@@ -16,17 +16,27 @@
#
echo "The test will process the following directory tree:"
-sleep 2s
-tree ${DATA_DIR} | pv -q -L 512
-sleep 8s
+mkdir -p ${DATA_DIR}
ITERATIONS=${1:-5}
BATCH_SIZE=${2:-50}
+FILE_COUNT=${3:-100}
for i in $(seq 0 ${ITERATIONS}) ; do
+ mkdir -p ${DATA_DIR}/${i}
+
echo
"********************************************************************************"
echo "Running the iteration ${i} of ${ITERATIONS} with a batch of
${BATCH_SIZE} files"
echo
"********************************************************************************"
+
+ echo "Appending ${FILE_COUNT} files to the data directory (files from the
previous execution remain there)"
+ for file in $(seq 1 100) ; do
+ echo ${RANDOM} > ${DATA_DIR}/${i}/${file}
+ done
+
+ echo "Only the following files should processed in this execution:"
+ tree ${DATA_DIR}/${i} | pv -q -L 1014
+
java -Dinput.dir=${DATA_DIR} \
-Doutput.dir=/tmp/out \
-Dresume.type=kafka \