NIFI-365: Initial implementation of spark receiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8506a0ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8506a0ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8506a0ce Branch: refs/heads/NIFI-360 Commit: 8506a0ce4ae71d2a103e11d6b8bf7e5832c81e09 Parents: 7ab4392 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Feb 18 21:19:58 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Feb 18 21:19:58 2015 -0500 ---------------------------------------------------------------------- nifi/nifi-external/README.md | 19 ++ nifi/nifi-external/nifi-spark-receiver/pom.xml | 38 ++++ .../org/apache/nifi/spark/NiFiDataPacket.java | 40 ++++ .../org/apache/nifi/spark/NiFiReceiver.java | 198 +++++++++++++++++++ nifi/nifi-external/pom.xml | 29 +++ nifi/pom.xml | 1 + 6 files changed, 325 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/README.md ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/README.md b/nifi/nifi-external/README.md new file mode 100644 index 0000000..649ad17 --- /dev/null +++ b/nifi/nifi-external/README.md @@ -0,0 +1,19 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# nifi-external + +The nifi-external module is a location where components can be developed by the NiFi team +that are not intended to be used directly by NiFi but are to be used within other frameworks +in order to integrate with NiFi. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml b/nifi/nifi-external/nifi-spark-receiver/pom.xml new file mode 100644 index 0000000..b21d554 --- /dev/null +++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-spark-receiver</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java new file mode 100644 index 0000000..2f08dc5 --- /dev/null +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.spark; + +import java.util.Map; + +/** + * <p> + * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's + * content and its attributes so that they can be processed by Spark + * </p> + */ +public interface NiFiDataPacket { + + /** + * Returns the contents of a NiFi FlowFile + * @return + */ + byte[] getContent(); + + /** + * Returns a Map of attributes that are associated with the NiFi FlowFile + * @return + */ + Map<String, String> getAttributes(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java new file mode 100644 index 0000000..9f31062 --- /dev/null +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.spark; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; + + +/** + * <p> + * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data + * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects + * to NiFi instance provided in the config and requests data from + * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group, + * it acts as a queue of data for remote clients. This receiver is then able to pull that data + * from NiFi reliably. + * </p> + * + * <p> + * It is important to note that if pulling data from a NiFi cluster, the URL that should be used + * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes + * in that cluster and pull from those nodes as appropriate. + * </p> + * + * <p> + * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide + * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}. + * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This + * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark". + * Additionally, it assumes that the data that it will receive from this OutputPort is text + * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string. + * </p> + * + * <code> + * <pre> + * Pattern SPACE = Pattern.compile(" "); + * + * // Build a Site-to-site client config + * SiteToSiteClientConfig config = new SiteToSiteClient.Builder() + * .setUrl("http://localhost:8080/nifi") + * .setPortName("Data For Spark") + * .buildConfig(); + * + * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example"); + * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L)); + * + * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from + * // specified Port + * JavaReceiverInputDStream<NiFiDataPacket> packetStream = + * ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY())); + * + * // Map the data from NiFi to text, ignoring the attributes + * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() { + * public String call(final NiFiDataPacket dataPacket) throws Exception { + * return new String(dataPacket.getContent(), StandardCharsets.UTF_8); + * } + * }); + * + * // Split the words by spaces + * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() { + * public Iterable<String> call(final String text) throws Exception { + * return Arrays.asList(SPACE.split(text)); + * } + * }); + * + * // Map each word to the number 1, then aggregate by key + * JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + * new PairFunction<String, String, Integer>() { + * public Tuple2<String, Integer> call(String s) { + * return new Tuple2<String, Integer>(s, 1); + * } + * }).reduceByKey(new Function2<Integer, Integer, Integer>() { + * public Integer call(Integer i1, Integer i2) { + * return i1 + i2; + * } + * } + * ); + * + * // print the results + * wordCounts.print(); + * ssc.start(); + * ssc.awaitTermination(); + * </pre> + * </code> + */ +public class NiFiReceiver extends Receiver<NiFiDataPacket> { + private static final long serialVersionUID = 3067274587595578836L; + private final SiteToSiteClientConfig clientConfig; + + public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { + super(storageLevel); + this.clientConfig = clientConfig; + } + + @Override + public void onStart() { + final Thread thread = new Thread(new ReceiveRunnable()); + thread.setDaemon(true); + thread.setName("NiFi Receiver"); + thread.start(); + } + + @Override + public void onStop() { + } + + class ReceiveRunnable implements Runnable { + public ReceiveRunnable() { + } + + public void run() { + try { + final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + try { + while ( !isStopped() ) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + DataPacket dataPacket = transaction.receive(); + if ( dataPacket == null ) { + transaction.confirm(); + transaction.complete(); + + // no data available. Wait a bit and try again + try { + Thread.sleep(1000L); + } catch (InterruptedException e) {} + + continue; + } + + final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map<String, String> attributes = dataPacket.getAttributes(); + final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() { + public byte[] getContent() { + return data; + } + + public Map<String, String> getAttributes() { + return attributes; + } + }; + + dataPackets.add(NiFiDataPacket); + dataPacket = transaction.receive(); + } while ( dataPacket != null ); + + // Confirm transaction to verify the data + transaction.confirm(); + + store(dataPackets.iterator()); + + transaction.complete(); + } + } finally { + try { + client.close(); + } catch (final IOException ioe) { + reportError("Failed to close client", ioe); + } + } + } catch (final IOException ioe) { + restart("Failed to receive data from NiFi", ioe); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/pom.xml b/nifi/nifi-external/pom.xml new file mode 100644 index 0000000..878098f --- /dev/null +++ b/nifi/nifi-external/pom.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-external</artifactId> + <packaging>pom</packaging> + <modules> + <module>nifi-spark-receiver</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 6297161..5881db7 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -65,6 +65,7 @@ <module>nifi-assembly</module> <module>nifi-docs</module> <module>nifi-maven-archetypes</module> + <module>nifi-external</module> </modules> <scm> <connection>scm:git:git://git.apache.org/incubator-nifi.git</connection>