This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4f3ac36 Implement Cassandra Sink (#1534) 4f3ac36 is described below commit 4f3ac369ca31888508b791ccd34d8821d8bb9157 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Apr 10 15:27:59 2018 -0700 Implement Cassandra Sink (#1534) * Added Cassandra Sink Connector --- pom.xml | 1 + .../org/apache/pulsar/common/util/KeyValue.java | 48 ++++++++++ pulsar-connect/cassandra/pom.xml | 67 +++++++++++++ .../pulsar/connect/cassandra/CassandraSink.java | 105 +++++++++++++++++++++ .../connect/cassandra/CassandraSinkConfig.java | 57 +++++++++++ pulsar-connect/pom.xml | 1 + 6 files changed, 279 insertions(+) diff --git a/pom.xml b/pom.xml index 2ea3f52..77b0fa3 100644 --- a/pom.xml +++ b/pom.xml @@ -142,6 +142,7 @@ flexible messaging model and an intuitive client API.</description> <sketches.version>0.8.3</sketches.version> <jctools.version>2.1.1</jctools.version> <hbc-core.version>2.2.0</hbc-core.version> + <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version> <!-- test dependencies --> <disruptor.version>3.4.0</disruptor.version> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyValue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyValue.java new file mode 100644 index 0000000..4c7be8e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyValue.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +/** + * A simple KeyValue class + */ +public class KeyValue<K, V> { + private K key; + private V value; + + public KeyValue(K key, V value) { + setKey(key); + setValue(value); + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + public void setKey(K key) { + this.key = key; + } + + public void setValue(V value) { + this.value = value; + } +} \ No newline at end of file diff --git a/pulsar-connect/cassandra/pom.xml b/pulsar-connect/cassandra/pom.xml new file mode 100644 index 0000000..fe77e78 --- /dev/null +++ b/pulsar-connect/cassandra/pom.xml @@ -0,0 +1,67 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-connect</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-connect-cassandra</artifactId> + <name>Pulsar Connect :: Cassandra</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-connect-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra-driver-core.version}</version> + </dependency> + + </dependencies> + +</project> diff --git a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java new file mode 100644 index 0000000..1ed43a8 --- /dev/null +++ b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.cassandra; + +import com.datastax.driver.core.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.pulsar.common.util.KeyValue; +import org.apache.pulsar.connect.core.Sink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Simple Cassandra sink + * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname. + */ +public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); + + // ----- Runtime fields + private Cluster cluster; + private Session session; + CassandraSinkConfig cassandraSinkConfig; + private PreparedStatement statement; + + @Override + public void open(Map<String, String> config) throws Exception { + cassandraSinkConfig = CassandraSinkConfig.load(config); + if (cassandraSinkConfig.getRoots() == null + || cassandraSinkConfig.getKeyspace() == null + || cassandraSinkConfig.getKeyname() == null + || cassandraSinkConfig.getColumnFamily() == null + || cassandraSinkConfig.getColumnName() == null) { + throw new IllegalArgumentException("Required property not set."); + } + createClient(cassandraSinkConfig.getRoots()); + statement = session.prepare("INSERT INTO " + cassandraSinkConfig.getColumnFamily() + " (" + + cassandraSinkConfig.getKeyname() + ", " + cassandraSinkConfig.getColumnName() + ") VALUES (?, ?)"); + } + + @Override + public void close() throws Exception { + session.close(); + cluster.close(); + } + + @Override + public CompletableFuture<Void> write(KeyValue<K, V> tuple) { + BoundStatement bound = statement.bind(tuple.getKey(), tuple.getValue()); + ResultSetFuture future = session.executeAsync(bound); + CompletableFuture<Void> completable = new CompletableFuture<Void>(); + Futures.addCallback(future, + new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet result) { + completable.complete(null); + } + + @Override + public void onFailure(Throwable t) { + completable.completeExceptionally(t); + } + }); + return completable; + } + + private void createClient(String roots) { + String[] hosts = roots.split(","); + if (hosts.length <= 0) { + throw new RuntimeException("Invalid cassandra roots"); + } + Cluster.Builder b = Cluster.builder(); + for (int i = 0; i < hosts.length; ++i) { + String[] hostPort = hosts[i].split(":"); + b.addContactPoint(hostPort[0]); + if (hostPort.length > 1) { + b.withPort(Integer.valueOf(hostPort[1])); + } + } + cluster = b.build(); + session = cluster.connect(); + session.execute("USE " + cassandraSinkConfig.getKeyspace()); + } +} \ No newline at end of file diff --git a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java new file mode 100644 index 0000000..b21c95e --- /dev/null +++ b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.cassandra; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.*; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class CassandraSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String roots; + private String keyspace; + private String keyname; + private String columnFamily; + private String columnName; + + public static CassandraSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), CassandraSinkConfig.class); + } + + public static CassandraSinkConfig load(Map<String, String> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), CassandraSinkConfig.class); + } +} \ No newline at end of file diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml index bf1303f..702d0ad 100644 --- a/pulsar-connect/pom.xml +++ b/pulsar-connect/pom.xml @@ -34,6 +34,7 @@ <modules> <module>core</module> <module>twitter</module> + <module>cassandra</module> </modules> </project> -- To stop receiving notification emails like this one, please contact si...@apache.org.