This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
The following commit(s) were added to refs/heads/master by this push:
new 40f6ccd Added FTPS source connector example
40f6ccd is described below
commit 40f6ccda5fbb3b0f769e20766d89bb56e8ef1560
Author: Andrea Cosentino <[email protected]>
AuthorDate: Wed Nov 11 10:57:43 2020 +0100
Added FTPS source connector example
---
ftps/ftps-source/README.adoc | 242 +++++++++++++++++++++
.../config/CamelFtpsSourceConnector.properties | 32 +++
2 files changed, 274 insertions(+)
diff --git a/ftps/ftps-source/README.adoc b/ftps/ftps-source/README.adoc
new file mode 100644
index 0000000..ad6d101
--- /dev/null
+++ b/ftps/ftps-source/README.adoc
@@ -0,0 +1,242 @@
+# Camel-Kafka-connector FTPS Source
+
+This is an example for Camel-Kafka-connector FTPS Source
+
+## Standalone
+
+### What is needed
+
+- An FTPS server
+
+### Setting up FTPS Server
+
+We'll use the fauria/vsftpd docker image
+
+Run the following command:
+
+```
+> docker run -p 21:21 -p21100-21110:21100-21110 --env PASV_ADDRESS=127.0.0.1
loicmathieu/vsftpd ftps
+Launching vsftp on ftps protocol
+Activating passv on 127.0.0.1
+Generating self-signed certificate
+Generating a 2048 bit RSA private key
+..........................+++
+..................................+++
+writing new private key to '/etc/vsftpd/private/vsftpd.pem'
+```
+
+In another terminal
+
+```
+> docker ps
+CONTAINER ID IMAGE COMMAND CREATED
STATUS PORTS
NAMES
+ce99af9141c9 loicmathieu/vsftpd "/start.sh ftps" 7 seconds ago
Up 6 seconds 0.0.0.0:21->21/tcp, 20/tcp,
0.0.0.0:21100-21110->21100-21110/tcp confident_leavitt
+```
+
+take note of the container id. In our case it is ce99af9141c9
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh
$KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092
--replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate
-DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes
-DarchetypeArtifactId=camel-kafka-connector-extensible-archetype
-DarchetypeVersion=0.6.0
+[INFO] Scanning for projects...
+[INFO]
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO]
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) >
generate-sources @ standalone-pom >>>
+[INFO]
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) <
generate-sources @ standalone-pom <<<
+[INFO]
+[INFO]
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @
standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from
[org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.6.0]
found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': ftps-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.6.0
+Define value for property 'package' org.apache.camel.kafkaconnector: :
+Define value for property 'camel-kafka-connector-name':
camel-ftps-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.6.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: ftps-extended
+version: 0.6.0
+package: com.github.oscerd
+camel-kafka-connector-name: camel-ftps-kafka-connector
+camel-kafka-connector-version: 0.6.0
+ Y: : Y
+[INFO]
----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype:
camel-kafka-connector-extensible-archetype:0.6.0
+[INFO]
----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: ftps-extended
+[INFO] Parameter: version, Value: 0.6.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.6.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-ftps-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.6.0
+[INFO] Parameter: artifactId, Value: ftps-extended
+[INFO] Project created from Archetype in dir:
/home/workspace/miscellanea/ftps-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time: 24.590 s
+[INFO] Finished at: 2020-11-05T07:45:43+01:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/ftps-extended
+```
+
+We'll need to add a little transform for this example. So import the
ftp-extended project in your IDE and create a class in the only package there
+
+```
+package org.apache.camel.kafkaconnector;
+
+import java.util.Map;
+
+import org.apache.camel.component.file.remote.RemoteFile;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteFileTransforms <R extends ConnectRecord<R>> implements
Transformation<R> {
+ public static final String FIELD_KEY_CONFIG = "key";
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.MEDIUM,
+ "Transforms Remote File to String");
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteFileTransforms.class);
+
+ @Override
+ public R apply(R r) {
+ Object value = r.value();
+
+ if (r.value() instanceof RemoteFile) {
+ LOG.debug("Converting record from RemoteFile to text");
+ RemoteFile message = (RemoteFile) r.value();
+
+ LOG.debug("Received text: {}", message.getBody());
+
+ return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+ SchemaHelper.buildSchemaBuilderForType(message.getBody()),
message.getBody(), r.timestamp());
+
+ } else {
+ LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+ return r;
+ }
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll
need the generated zip from the previois build
+
+```
+> cd /home/oscerd/connectors/
+> cp
/home/workspace/miscellanea/ftps-extended/target/ftps-extended-0.6.0-package.zip
.
+> unzip ftps-extended-0.6.0-package.zip
+```
+
+Now it's time to setup the connector
+
+Open the FTP source configuration file
+
+```
+name=CamelFtpSourceConnector
+connector.class=org.apache.camel.kafkaconnector.ftp.CamelFtpSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+transforms=RemoteTransformer
+transforms.RemoteTransformer.type=org.apache.camel.kafkaconnector.RemoteFileTransforms
+
+topics=mytopic
+
+camel.source.path.host=127.0.0.1
+camel.source.path.port=21
+camel.source.endpoint.recursive=true
+camel.source.endpoint.passiveMode=true
+camel.source.endpoint.username=guest
+camel.source.endpoint.password=guest
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh
$KAFKA_HOME/config/connect-standalone.properties
config/CamelFtpsSourceConnector.properties
+```
+
+Now we need to connect to the ftp server and add some stuff to the demos folder
+
+```
+> docker exec -it ce99af9141c9 bash
+[root@ce99af9141c9 /]# cd /home/
+[root@ce99af9141c9 home]# ls -la
+total 0
+drwxr-xr-x. 1 root root 20 Jun 17 2019 .
+drwxr-xr-x. 1 root root 40 Nov 11 09:28 ..
+drwx------. 2 guest guest 62 Nov 11 09:28 guest
+drwxr-xr-x. 1 ftp ftp 6 Jun 17 2019 vsftpd
+[root@ce99af9141c9 home]# cd guest/
+[root@ce99af9141c9 guest]# su guest
+[guest@ce99af9141c9 ~]$ ls
+[guest@ce99af9141c9 ~]$ cat >> test.txt << 'END'
+> Hello CKC
+> END
+[guest@ce99af9141c9 ~]$ ls
+test.txt
+```
+
+In another terminal, using kafkacat, you should be able to see the headers.
+
+```
+> ./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n'
+% Auto-selecting Consumer mode (use -P or -C to override)
+Headers: Headers:
CamelHeader.CamelFileAbsolute=false,CamelHeader.CamelFileAbsolutePath=test.txt,CamelHeader.CamelFileHost=127.0.0.1,CamelHeader.CamelFileLastModified=1605083400000,CamelHeader.CamelFileLength=5,CamelHeader.CamelFileName=test.txt,CamelHeader.CamelFileNameConsumed=test.txt,CamelHeader.CamelFileNameOnly=test.txt,CamelHeader.CamelFileParent=/,CamelHeader.CamelFilePath=/test.txt,CamelHeader.CamelFileRelativePath=test.txt,CamelHeader.CamelFtpReplyCode=226,CamelHeader.CamelFtpR
[...]
+,CamelProperty.CamelBatchSize=1,CamelProperty.CamelUnitOfWorkProcessSync=true,CamelProperty.CamelBatchComplete=true,CamelProperty.CamelBatchIndex=0,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000:
Message value: Hello CKC
+% Reached end of topic mytopic [0] at offset 1
+```
+
diff --git a/ftps/ftps-source/config/CamelFtpsSourceConnector.properties
b/ftps/ftps-source/config/CamelFtpsSourceConnector.properties
new file mode 100644
index 0000000..6e85483
--- /dev/null
+++ b/ftps/ftps-source/config/CamelFtpsSourceConnector.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelFtpSourceConnector
+connector.class=org.apache.camel.kafkaconnector.ftp.CamelFtpSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+transforms=RemoteTransformer
+transforms.RemoteTransformer.type=org.apache.camel.kafkaconnector.RemoteFileTransforms
+
+topics=mytopic
+
+camel.source.path.host=127.0.0.1
+camel.source.path.port=21
+camel.source.endpoint.recursive=true
+camel.source.endpoint.passiveMode=true
+camel.source.endpoint.username=guest
+camel.source.endpoint.password=guest