This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new b46cc336 CAMEL-18127: implement adapter auto-configuration in the 
resume API
b46cc336 is described below

commit b46cc33621758829619e0efce88efff4c0244c25
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon May 23 18:10:04 2022 +0200

    CAMEL-18127: implement adapter auto-configuration in the resume API
---
 .../clients/kafka/ConsumerPropertyFactory.java     | 35 ---------
 .../kafka/DefaultConsumerPropertyFactory.java      | 82 ----------------------
 .../kafka/DefaultProducerPropertyFactory.java      | 60 ----------------
 .../resume/clients/kafka/FileDeserializer.java     | 35 ---------
 .../resume/clients/kafka/FileSerializer.java       | 34 ---------
 .../clients/kafka/ProducerPropertyFactory.java     | 35 ---------
 .../kafka/file/LargeFileRouteBuilder.java          | 22 ++++--
 .../kafka/fileset/LargeDirectoryRouteBuilder.java  |  9 ++-
 .../example/resume/file/offset/main/MainApp.java   | 43 ++++--------
 .../clusterized/main/ClusterizedListener.java      | 35 +++------
 .../resume-api-fileset/src/main/docker/Dockerfile  |  5 +-
 .../camel/example/resume/fileset/main/MainApp.java | 37 +++-------
 .../resume-api-fileset/src/main/scripts/run.sh     | 16 ++++-
 13 files changed, 72 insertions(+), 376 deletions(-)

diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
deleted file mode 100644
index 7998b82c..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-
-/**
- * An interface to produce properties that can be used to configure a Kafka 
consumer. The
- * CLI runtime equivalent for this file is the consumer.properties file from 
the Kafka
- * provided along with the Kafka deliverable
- *
- */
-public interface ConsumerPropertyFactory {
-
-    /**
-     * Gets the properties used to configure the consumer
-     * @return a Properties object containing the set of properties for the 
consumer
-     */
-    Properties getProperties();
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
deleted file mode 100644
index 6cd99aea..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-
-/**
- * A property producer that can be used to create a Kafka consumer with a 
minimum
- * set of configurations that can consume from a Kafka topic.
- * <p>
- * The consumer behavior from using this set of properties causes the consumer 
to
- * consumes all published messages "from-beginning".
- */
-public class DefaultConsumerPropertyFactory implements ConsumerPropertyFactory 
{
-    private final String bootstrapServer;
-    private String valueDeserializer = StringDeserializer.class.getName();
-    private String keyDeserializer = StringDeserializer.class.getName();
-    private String offsetReset = "earliest";
-    private String groupId = UUID.randomUUID().toString();
-
-    /**
-     * Constructs the properties using the given bootstrap server
-     *
-     * @param bootstrapServer the address of the server in the format
-     *                        PLAINTEXT://${address}:${port}
-     */
-    public DefaultConsumerPropertyFactory(String bootstrapServer) {
-        this.bootstrapServer = bootstrapServer;
-    }
-
-    public void setValueDeserializer(String valueDeserializer) {
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    public void setKeyDeserializer(String keyDeserializer) {
-        this.keyDeserializer = keyDeserializer;
-    }
-
-    public void setOffsetReset(String offsetReset) {
-        this.offsetReset = offsetReset;
-    }
-
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
-    @Override
-    public Properties getProperties() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer);
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer);
-        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
offsetReset);
-        return props;
-    }
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
deleted file mode 100644
index 6aa3ad30..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-public class DefaultProducerPropertyFactory implements ProducerPropertyFactory 
{
-    private final String bootstrapServer;
-    private String valueSerializer = StringDeserializer.class.getName();
-    private String keySerializer = StringDeserializer.class.getName();
-
-    /**
-     * Constructs the properties using the given bootstrap server
-     * @param bootstrapServer the address of the server in the format
-     *                       PLAINTEXT://${address}:${port}
-     */
-    public DefaultProducerPropertyFactory(String bootstrapServer) {
-        this.bootstrapServer = bootstrapServer;
-    }
-
-    public void setValueSerializer(String valueSerializer) {
-        this.valueSerializer = valueSerializer;
-    }
-
-    public void setKeySerializer(String keySerializer) {
-        this.keySerializer = keySerializer;
-    }
-
-    @Override
-    public Properties getProperties() {
-        Properties props = new Properties();
-
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
-
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializer);
-
-        return props;
-    }
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
deleted file mode 100644
index f7caa695..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.io.File;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileDeserializer implements Deserializer<File> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileDeserializer.class);
-
-    @Override
-    public File deserialize(String s, byte[] bytes) {
-        String name = new String(bytes);
-        LOG.trace("Deserializing {} from topic {}", name, s);
-        return new File(name);
-    }
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
deleted file mode 100644
index a991ff15..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.io.File;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileSerializer implements Serializer<File> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileSerializer.class);
-
-    @Override
-    public byte[] serialize(String s, File file) {
-        LOG.trace("Serializing file: {}", file.getPath());
-        return file.getPath().getBytes();
-    }
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
deleted file mode 100644
index 45ffda1e..00000000
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.example.resume.clients.kafka;
-
-import java.util.Properties;
-
-/**
- * An interface to produce properties that can be used to configure a Kafka 
consumer. The
- * CLI runtime equivalent for this file is the consumer.properties file from 
the Kafka
- * provided along with the Kafka deliverable
- *
- */
-public interface ProducerPropertyFactory {
-
-    /**
-     * Gets the properties used to configure the consumer
-     * @return a Properties object containing the set of properties for the 
consumer
-     */
-    Properties getProperties();
-}
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 9f162096..e304620e 100644
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -24,9 +24,10 @@ import java.util.concurrent.CountDownLatch;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.FileOffset;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
 import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
@@ -39,12 +40,15 @@ public class LargeFileRouteBuilder extends RouteBuilder {
     private static final Logger LOG = 
LoggerFactory.getLogger(LargeFileRouteBuilder.class);
 
     private KafkaResumeStrategy testResumeStrategy;
+    private final ResumeCache<File> cache;
+
     private long lastOffset;
     private int batchSize;
     private final CountDownLatch latch;
 
-    public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, 
CountDownLatch latch) {
+    public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, 
ResumeCache<File> cache, CountDownLatch latch) {
         this.testResumeStrategy = resumeStrategy;
+        this.cache = cache;
         String tmp = System.getProperty("resume.batch.size", "30");
         this.batchSize = Integer.valueOf(tmp);
 
@@ -58,8 +62,14 @@ public class LargeFileRouteBuilder extends RouteBuilder {
         File path = exchange.getMessage().getHeader("CamelFilePath", 
File.class);
         LOG.debug("Path: {} ", path);
 
-        final GenericFileResumeAdapter adapter = 
testResumeStrategy.getAdapter(GenericFileResumeAdapter.class);
-        lastOffset = adapter.getLastOffset(path).orElse(0L);
+        FileOffset offsetContainer = cache.get(path, FileOffset.class);
+
+        if (offsetContainer != null) {
+            lastOffset = offsetContainer.offset();
+        } else {
+            lastOffset = 0;
+        }
+
         LOG.debug("Starting to read at offset {}", lastOffset);
 
         String line = br.readLine();
@@ -68,7 +78,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
             if (line == null || line.isEmpty()) {
                 LOG.debug("End of file");
                 // EOF, therefore reset the offset
-                final Resumable<File, Long> resumable = Resumables.of(path, 
0L);
+                final Resumable resumable = Resumables.of(path, 0L);
                 exchange.getMessage().setHeader(Exchange.OFFSET, resumable);
 
                 break;
@@ -87,6 +97,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
         }
 
         if (count == batchSize) {
+            LOG.info("Reached the last offset in the batch. Stopping ...");
             exchange.setRouteStop(true);
             latch.countDown();
         }
@@ -97,6 +108,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
      */
     public void configure() {
         getCamelContext().getRegistry().bind("testResumeStrategy", 
testResumeStrategy);
+        getCamelContext().getRegistry().bind("resumeCache", cache);
 
         from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
                 .resumable("testResumeStrategy")
diff --git 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 18d7d9f9..9c060e6c 100644
--- 
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ 
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -22,16 +22,20 @@ import java.io.File;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LargeDirectoryRouteBuilder extends RouteBuilder {
     private static final Logger LOG = 
LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
-    private KafkaResumeStrategy<File, File> testResumeStrategy;
+    private final KafkaResumeStrategy<Resumable> testResumeStrategy;
+    private final ResumeCache<File> cache;
 
-    public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy) {
+    public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy, 
ResumeCache<File> cache) {
         this.testResumeStrategy = resumeStrategy;
+        this.cache = cache;
     }
 
     private void process(Exchange exchange) throws Exception {
@@ -48,6 +52,7 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder {
      */
     public void configure() {
         getCamelContext().getRegistry().bind("testResumeStrategy", 
testResumeStrategy);
+        getCamelContext().getRegistry().bind("resumeCache", cache);
 
         from("file:{{input.dir}}?noop=true&recursive=true")
                 .resumable("testResumeStrategy")
diff --git 
a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
 
b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index eedbd164..9b01173d 100644
--- 
a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ 
b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -17,23 +17,17 @@
 
 package org.apache.camel.example.resume.file.offset.main;
 
-import java.io.File;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.single.CaffeineCache;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
-import 
org.apache.camel.component.file.consumer.adapters.DefaultGenericFileResumeAdapter;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
 import 
org.apache.camel.example.resume.strategies.kafka.file.LargeFileRouteBuilder;
 import org.apache.camel.main.Main;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 /**
  * A Camel Application
@@ -47,9 +41,9 @@ public class MainApp {
         Main main = new Main();
 
         CountDownLatch latch = new CountDownLatch(1);
-        SingleNodeKafkaResumeStrategy<File, File> resumeStrategy = 
getUpdatableConsumerResumeStrategy();
+        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = 
getUpdatableConsumerResumeStrategy();
 
-        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, 
latch);
+        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, 
new CaffeineCache<>(100), latch);
         main.configure().addRoutesBuilder(routeBuilder);
 
         Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, 
latch));
@@ -57,38 +51,27 @@ public class MainApp {
         main.run(args);
     }
 
-    private static SingleNodeKafkaResumeStrategy<File, File> 
getUpdatableConsumerResumeStrategy() {
+    private static SingleNodeKafkaResumeStrategy<Resumable> 
getUpdatableConsumerResumeStrategy() {
         String bootStrapAddress = System.getProperty("bootstrap.address", 
"localhost:9092");
         String kafkaTopic = System.getProperty("resume.type.kafka.topic", 
"offsets");
 
-        final DefaultConsumerPropertyFactory consumerPropertyFactory = new 
DefaultConsumerPropertyFactory(bootStrapAddress);
-
-        
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-        
consumerPropertyFactory.setValueDeserializer(LongDeserializer.class.getName());
+        final Properties consumerProperties = 
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
 
         // In this case, we want to consume only the most recent offset
-        consumerPropertyFactory.setOffsetReset("latest");
-
-        final DefaultProducerPropertyFactory producerPropertyFactory = new 
DefaultProducerPropertyFactory(bootStrapAddress);
-
-        
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-        
producerPropertyFactory.setValueSerializer(LongSerializer.class.getName());
-
-        CaffeineCache<File,Long> cache = new CaffeineCache<>(1);
+        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest");
 
-        GenericFileResumeAdapter adapter = new 
DefaultGenericFileResumeAdapter(cache);
+        final Properties producerProperties = 
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
 
-        return new SingleNodeKafkaResumeStrategy(kafkaTopic, cache, adapter, 
producerPropertyFactory.getProperties(),
-                consumerPropertyFactory.getProperties());
+        return new SingleNodeKafkaResumeStrategy(kafkaTopic, 
producerProperties, consumerProperties);
     }
 
     private static void waitForStop(Main main, CountDownLatch latch) {
         try {
             latch.await();
+            System.exit(0);
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            System.exit(1);
         }
-        main.stop();
     }
 
 
diff --git 
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
 
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
index 5686f89e..e0353eee 100644
--- 
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
+++ 
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
@@ -16,22 +16,18 @@
  */
 package org.apache.camel.example.resume.fileset.clusterized.main;
 
-import java.io.File;
+import java.util.Properties;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.multi.CaffeineCache;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import 
org.apache.camel.component.file.consumer.adapters.DefaultFileSetResumeAdapter;
 import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
 import 
org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
 import org.apache.camel.processor.resume.kafka.MultiNodeKafkaResumeStrategy;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +56,7 @@ class ClusterizedListener implements MainListener {
             main.getCamelContext().addService(clusterService);
 
             LOG.trace("Creating the strategy");
-            MultiNodeKafkaResumeStrategy<File,File> resumeStrategy = 
getUpdatableConsumerResumeStrategyForSet();
+            MultiNodeKafkaResumeStrategy<Resumable> resumeStrategy = 
getUpdatableConsumerResumeStrategyForSet();
             main.getCamelContext().getRegistry().bind("testResumeStrategy", 
resumeStrategy);
 
             LOG.trace("Creating the route");
@@ -98,25 +94,14 @@ class ClusterizedListener implements MainListener {
         System.exit(0);
     }
 
-    private static MultiNodeKafkaResumeStrategy<File,File> 
getUpdatableConsumerResumeStrategyForSet() {
+    private static MultiNodeKafkaResumeStrategy<Resumable> 
getUpdatableConsumerResumeStrategyForSet() {
         String bootStrapAddress = System.getProperty("bootstrap.address", 
"localhost:9092");
         String kafkaTopic = System.getProperty("resume.type.kafka.topic", 
"offsets");
 
-        final DefaultConsumerPropertyFactory consumerPropertyFactory = new 
DefaultConsumerPropertyFactory(bootStrapAddress);
+        final Properties consumerProperties = 
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
-        
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-        
consumerPropertyFactory.setValueDeserializer(FileDeserializer.class.getName());
-        consumerPropertyFactory.setOffsetReset("earliest");
-
-        final DefaultProducerPropertyFactory producerPropertyFactory = new 
DefaultProducerPropertyFactory(bootStrapAddress);
-
-        
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-        
producerPropertyFactory.setValueSerializer(FileSerializer.class.getName());
-
-        CaffeineCache<File, File> cache = new CaffeineCache<>(10000);
-        FileSetResumeAdapter adapter = new DefaultFileSetResumeAdapter(cache);
-
-        return new MultiNodeKafkaResumeStrategy(kafkaTopic, cache, adapter, 
producerPropertyFactory.getProperties(),
-                consumerPropertyFactory.getProperties());
+        final Properties producerProperties = 
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+        return new MultiNodeKafkaResumeStrategy(kafkaTopic, 
producerProperties, consumerProperties);
     }
 }
diff --git a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile 
b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
index 72c319c8..f054938a 100644
--- a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
+++ b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
@@ -22,9 +22,6 @@ COPY src/main/scripts/run.sh /deployments/run.sh
 ENV JAVA_HOME /etc/alternatives/jre
 ENV DATA_DIR /data/source
 
-RUN mkdir -p ${DATA_DIR} && \
-    cd ${DATA_DIR} && \
-    for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 
100) ; do echo $RANDOM > $file ; done) ; cd ..) ; done && \
-    chmod +x /deployments/*.sh
+RUN chmod +x /deployments/*.sh
 WORKDIR /deployments/
 CMD [ "sh", "-c", "/deployments/run.sh" ]
diff --git 
a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
 
b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index 1c0fc728..3f7e39f2 100644
--- 
a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++ 
b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -17,19 +17,15 @@
 
 package org.apache.camel.example.resume.fileset.main;
 
-import java.io.File;
+import java.util.Properties;
 
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.caffeine.resume.multi.CaffeineCache;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import 
org.apache.camel.component.file.consumer.adapters.DefaultFileSetResumeAdapter;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
-import 
org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
-import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
-import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
 import 
org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
 import org.apache.camel.main.Main;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 /**
  * A Camel Application
@@ -42,33 +38,22 @@ public class MainApp {
     public static void main(String... args) throws Exception {
         Main main = new Main();
 
-        SingleNodeKafkaResumeStrategy<File, File> resumeStrategy = 
getUpdatableConsumerResumeStrategyForSet();
-        RouteBuilder routeBuilder = new 
LargeDirectoryRouteBuilder(resumeStrategy);
+        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = 
getUpdatableConsumerResumeStrategyForSet();
+        RouteBuilder routeBuilder = new 
LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
 
         main.configure().addRoutesBuilder(routeBuilder);
         main.run(args);
     }
 
-    private static SingleNodeKafkaResumeStrategy<File, File> 
getUpdatableConsumerResumeStrategyForSet() {
+    private static SingleNodeKafkaResumeStrategy<Resumable> 
getUpdatableConsumerResumeStrategyForSet() {
         String bootStrapAddress = System.getProperty("bootstrap.address", 
"localhost:9092");
         String kafkaTopic = System.getProperty("resume.type.kafka.topic", 
"offsets");
 
-        final DefaultConsumerPropertyFactory consumerPropertyFactory = new 
DefaultConsumerPropertyFactory(bootStrapAddress);
+        final Properties consumerProperties = 
SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
-        
consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
-        
consumerPropertyFactory.setValueDeserializer(FileDeserializer.class.getName());
-        consumerPropertyFactory.setOffsetReset("earliest");
-
-        final DefaultProducerPropertyFactory producerPropertyFactory = new 
DefaultProducerPropertyFactory(bootStrapAddress);
-
-        
producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
-        
producerPropertyFactory.setValueSerializer(FileSerializer.class.getName());
-
-        CaffeineCache<File, File> cache = new CaffeineCache<>(10000);
-        FileSetResumeAdapter fileSetResumeAdapter = new 
DefaultFileSetResumeAdapter(cache);
-
-        return new SingleNodeKafkaResumeStrategy<>(kafkaTopic, cache, 
fileSetResumeAdapter,
-                producerPropertyFactory.getProperties(), 
consumerPropertyFactory.getProperties());
+        final Properties producerProperties = 
SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+        return new SingleNodeKafkaResumeStrategy<>(kafkaTopic, 
producerProperties, consumerProperties);
     }
 
 }
diff --git a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh 
b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
index f060b489..fff449c1 100644
--- a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
+++ b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
@@ -16,17 +16,27 @@
 #
 echo "The test will process the following directory tree:"
 
-sleep 2s
-tree ${DATA_DIR} | pv -q -L 512
-sleep 8s
+mkdir -p ${DATA_DIR}
 
 ITERATIONS=${1:-5}
 BATCH_SIZE=${2:-50}
+FILE_COUNT=${3:-100}
 
 for i in $(seq 0 ${ITERATIONS}) ; do
+  mkdir -p ${DATA_DIR}/${i}
+
   echo 
"********************************************************************************"
   echo "Running the iteration ${i} of ${ITERATIONS} with a batch of 
${BATCH_SIZE} files"
   echo 
"********************************************************************************"
+
+  echo "Appending ${FILE_COUNT} files to the data directory (files from the 
previous execution remain there)"
+  for file in $(seq 1 100) ; do
+    echo ${RANDOM} > ${DATA_DIR}/${i}/${file}
+  done
+
+  echo "Only the following files should processed in this execution:"
+  tree ${DATA_DIR}/${i} | pv -q -L 1014
+
   java -Dinput.dir=${DATA_DIR} \
     -Doutput.dir=/tmp/out \
     -Dresume.type=kafka \

Reply via email to