This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new b480ba8  Added an AWS2-Kinesis example for source connector
b480ba8 is described below

commit b480ba82e640298957ae062b110430bca823baf4
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Jan 14 19:03:42 2021 +0100

    Added an AWS2-Kinesis example for source connector
---
 aws2-kinesis/aws2-kinesis-source/README.adoc       | 188 +++++++++++++++++++++
 .../CamelAWS2KinesisSourceConnector.properties     |  29 ++++
 2 files changed, 217 insertions(+)

diff --git a/aws2-kinesis/aws2-kinesis-source/README.adoc 
b/aws2-kinesis/aws2-kinesis-source/README.adoc
new file mode 100644
index 0000000..2afe09e
--- /dev/null
+++ b/aws2-kinesis/aws2-kinesis-source/README.adoc
@@ -0,0 +1,188 @@
+# Camel-Kafka-connector AWS2 Kinesis Source
+
+This is an example for Camel-Kafka-connector AWS2-Kinesis Source
+
+## Standalone
+
+### What is needed
+
+- An AWS Kinesis stream
+- Some work on AWS console
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh 
$KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 
--replication-factor 1 --partitions 1 --topic mytopic
+```
+
+=== Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  
-DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  
-DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  
-DarchetypeVersion=0.7.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.2.0:generate (default-cli) > 
generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.2.0:generate (default-cli) < 
generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.2.0:generate (default-cli) @ 
standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from 
[org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.7.0]
 found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': aws2-kinesis-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.7.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': 
camel-aws2-kinesis-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.7.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: aws2-kinesis-extended
+version: 0.7.0
+package: org.apache.camel.kafkaconnector
+camel-kafka-connector-name: camel-aws2-kinesis-kafka-connector
+camel-kafka-connector-version: 0.7.0
+ Y: : Y
+[INFO] 
----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: 
camel-kafka-connector-extensible-archetype:0.7.0
+[INFO] 
----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
+[INFO] Parameter: version, Value: 0.7.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.7.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: 
camel-aws2-kinesis-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.7.0
+[INFO] Parameter: artifactId, Value: aws2-kinesis-extended
+[INFO] Project created from Archetype in dir: 
/home/oscerd/workspace/miscellanea/aws2-kinesis-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  01:01 min
+[INFO] Finished at: 2021-01-14T14:15:24+01:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/aws2-kinesis-extended
+```
+
+We'll need to add a little transform for this example. So import the 
ftp-extended project in your IDE and create a class in the only package there
+
+```
+package org.apache.camel.kafkaconnector;
+
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+public class KinesisRecordDataTransforms <R extends ConnectRecord<R>> 
implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+                    "Transforms Data to String");
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordDataTransforms.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (value instanceof Record) {
+            LOG.debug("Converting record from Data to String");
+            Record message = (Record) r.value();
+
+            String payload = new String(message.data().asByteArray());
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(payload), payload, 
r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+On AWS console create a Kinesis stream delivery stream named streamTest.
+
+Now it's time to setup the connectors
+
+Open the AWS2 Kinesis configuration file
+
+```
+name=CamelAws2-kinesisSourceConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector
+tasks.max=1
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=KinesisRecordDataTransforms
+transforms.KinesisRecordDataTransforms.type=org.apache.camel.kafkaconnector.KinesisRecordDataTransforms
+
+topics=mytopic
+camel.source.path.streamName=streamTest
+
+camel.source.endpoint.accessKey=xxxx
+camel.source.endpoint.secretKey=yyyy
+camel.source.endpoint.region=region
+```
+
+and add the correct credentials for AWS.
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh 
$KAFKA_HOME/config/connect-standalone.properties 
config/CamelAWS2KinesisSourceConnector.properties
+```
+
+Now send a record to Kinesis streamTest stream with 'Kinesis Event 1' as data 
and a second record with 'Kinesis Event 2' as data.
+
+As example you can use the KinesisComponentIntegrationTest.java from the camel 
main repository.
+
+On a different terminal run the consumer with kafkacat
+
+```
+./kafkacat -b localhost:9092 -t mytopic
+% Auto-selecting Consumer mode (use -P or -C to override)
+{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 1."}
+{"schema":{"type":"string","optional":false},"payload":"Kinesis Event 2."}
+% Reached end of topic mytopic [0] at offset 2
+```
+
diff --git 
a/aws2-kinesis/aws2-kinesis-source/config/CamelAWS2KinesisSourceConnector.properties
 
b/aws2-kinesis/aws2-kinesis-source/config/CamelAWS2KinesisSourceConnector.properties
new file mode 100644
index 0000000..262ff9c
--- /dev/null
+++ 
b/aws2-kinesis/aws2-kinesis-source/config/CamelAWS2KinesisSourceConnector.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelAws2-kinesisSourceConnector
+connector.class=org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+transforms=KinesisRecordDataTransforms
+transforms.KinesisRecordDataTransforms.type=org.apache.camel.kafkaconnector.KinesisRecordDataTransforms
+
+topics=mytopic
+camel.source.path.streamName=streamTest
+
+camel.source.endpoint.accessKey=xxxx
+camel.source.endpoint.secretKey=yyyy
+camel.source.endpoint.region=region

Reply via email to