NIFI-365: Initial implementation of spark receiver

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8506a0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8506a0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8506a0ce

Branch: refs/heads/NIFI-360
Commit: 8506a0ce4ae71d2a103e11d6b8bf7e5832c81e09
Parents: 7ab4392
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Feb 18 21:19:58 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Feb 18 21:19:58 2015 -0500

----------------------------------------------------------------------
 nifi/nifi-external/README.md                    |  19 ++
 nifi/nifi-external/nifi-spark-receiver/pom.xml  |  38 ++++
 .../org/apache/nifi/spark/NiFiDataPacket.java   |  40 ++++
 .../org/apache/nifi/spark/NiFiReceiver.java     | 198 +++++++++++++++++++
 nifi/nifi-external/pom.xml                      |  29 +++
 nifi/pom.xml                                    |   1 +
 6 files changed, 325 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/README.md
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/README.md b/nifi/nifi-external/README.md
new file mode 100644
index 0000000..649ad17
--- /dev/null
+++ b/nifi/nifi-external/README.md
@@ -0,0 +1,19 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# nifi-external
+
+The nifi-external module is a location where components can be developed by 
the NiFi team
+that are not intended to be used directly by NiFi but are to be used within 
other frameworks
+in order to integrate with NiFi.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml 
b/nifi/nifi-external/nifi-spark-receiver/pom.xml
new file mode 100644
index 0000000..b21d554
--- /dev/null
+++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi</artifactId>
+        <version>0.0.2-incubating-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-spark-receiver</artifactId>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.spark</groupId>
+                       <artifactId>spark-streaming_2.10</artifactId>
+                       <version>1.2.0</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-site-to-site-client</artifactId>
+                       <version>0.0.2-incubating-SNAPSHOT</version>
+               </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
new file mode 100644
index 0000000..2f08dc5
--- /dev/null
+++ 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.spark;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both a FlowFile's
+ * content and its attributes so that they can be processed by Spark
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+       /**
+        * Returns the contents of a NiFi FlowFile
+        * @return
+        */
+       byte[] getContent();
+
+       /**
+        * Returns a Map of attributes that are associated with the NiFi 
FlowFile
+        * @return
+        */
+       Map<String, String> getAttributes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
new file mode 100644
index 0000000..9f31062
--- /dev/null
+++ 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.spark;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+
+/**
+ * <p>
+ * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to 
pull data 
+ * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi 
Receiver connects 
+ * to NiFi instance provided in the config and requests data from
+ * the OutputPort that is named. In NiFi, when an OutputPort is added to the 
root process group,
+ * it acts as a queue of data for remote clients. This receiver is then able 
to pull that data
+ * from NiFi reliably.
+ * </p>
+ * 
+ * <p>
+ * It is important to note that if pulling data from a NiFi cluster, the URL 
that should be used
+ * is that of the NiFi Cluster Manager. The Receiver will automatically handle 
determining the nodes
+ * in that cluster and pull from those nodes as appropriate.
+ * </p>
+ * 
+ * <p>
+ * In order to use the NiFiReceiver, you will need to first build a {@link 
SiteToSiteClientConfig} to provide 
+ * to the constructor. This can be achieved by using the {@link 
SiteToSiteClient.Builder}.
+ * Below is an example snippet of driver code to pull data from NiFi that is 
running on localhost:8080. This
+ * example assumes that NiFi exposes and OutputPort on the root group named 
"Data For Spark".
+ * Additionally, it assumes that the data that it will receive from this 
OutputPort is text
+ * data, as it will map the byte array received from NiFi to a UTF-8 Encoded 
string.
+ * </p>
+ * 
+ * <code>
+ * <pre>
+ * Pattern SPACE = Pattern.compile(" ");
+ * 
+ * // Build a Site-to-site client config
+ * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
+ *   .setUrl("http://localhost:8080/nifi";)
+ *   .setPortName("Data For Spark")
+ *   .buildConfig();
+ * 
+ * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming 
example");
+ * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(1000L));
+ * 
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can 
pull data from 
+ * // specified Port
+ * JavaReceiverInputDStream<NiFiDataPacket> packetStream = 
+ *     ssc.receiverStream(new NiFiReceiver(clientConfig, 
StorageLevel.MEMORY_ONLY()));
+ * 
+ * // Map the data from NiFi to text, ignoring the attributes
+ * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, 
String>() {
+ *   public String call(final NiFiDataPacket dataPacket) throws Exception {
+ *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
+ *   }
+ * });
+ * 
+ * // Split the words by spaces
+ * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, 
String>() {
+ *   public Iterable<String> call(final String text) throws Exception {
+ *     return Arrays.asList(SPACE.split(text));
+ *   }
+ * });
+ *         
+ * // Map each word to the number 1, then aggregate by key
+ * JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ *   new PairFunction<String, String, Integer>() {
+ *     public Tuple2<String, Integer> call(String s) {
+ *       return new Tuple2<String, Integer>(s, 1);
+ *     }
+ *   }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ *     public Integer call(Integer i1, Integer i2) {
+ *       return i1 + i2;
+ *     }
+ *    }
+ *  );
+ * 
+ * // print the results
+ * wordCounts.print();
+ * ssc.start();
+ * ssc.awaitTermination();
+ * </pre>
+ * </code>
+ */
+public class NiFiReceiver extends Receiver<NiFiDataPacket> {
+       private static final long serialVersionUID = 3067274587595578836L;
+       private final SiteToSiteClientConfig clientConfig;
+       
+       public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final 
StorageLevel storageLevel) {
+               super(storageLevel);
+               this.clientConfig = clientConfig;
+       }
+       
+       @Override
+       public void onStart() {
+               final Thread thread = new Thread(new ReceiveRunnable());
+               thread.setDaemon(true);
+               thread.setName("NiFi Receiver");
+               thread.start();
+       }
+
+       @Override
+       public void onStop() {
+       }
+
+       class ReceiveRunnable implements Runnable {
+               public ReceiveRunnable() {
+               }
+               
+               public void run() {
+                       try {
+                               final SiteToSiteClient client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+                               try {
+                                       while ( !isStopped() ) {
+                                               final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                                               DataPacket dataPacket = 
transaction.receive();
+                                               if ( dataPacket == null ) {
+                                                       transaction.confirm();
+                                                       transaction.complete();
+                                                       
+                                                       // no data available. 
Wait a bit and try again
+                                                       try {
+                                                               
Thread.sleep(1000L);
+                                                       } catch 
(InterruptedException e) {}
+                                                       
+                                                       continue;
+                                               }
+       
+                                               final List<NiFiDataPacket> 
dataPackets = new ArrayList<NiFiDataPacket>();
+                                               do {
+                                                       // Read the data into a 
byte array and wrap it along with the attributes
+                                                       // into a 
NiFiDataPacket.
+                                                       final InputStream 
inStream = dataPacket.getData();
+                                                       final byte[] data = new 
byte[(int) dataPacket.getSize()];
+                                                       
StreamUtils.fillBuffer(inStream, data);
+                                                       
+                                                       final Map<String, 
String> attributes = dataPacket.getAttributes();
+                                                       final NiFiDataPacket 
NiFiDataPacket = new NiFiDataPacket() {
+                                                               public byte[] 
getContent() {
+                                                                       return 
data;
+                                                               }
+
+                                                               public 
Map<String, String> getAttributes() {
+                                                                       return 
attributes;
+                                                               }
+                                                       };
+                                                       
+                                                       
dataPackets.add(NiFiDataPacket);
+                                                       dataPacket = 
transaction.receive();
+                                               } while ( dataPacket != null );
+
+                                               // Confirm transaction to 
verify the data
+                                               transaction.confirm();
+                                               
+                                               store(dataPackets.iterator());
+                                               
+                                               transaction.complete();
+                                       }
+                               } finally {
+                                       try {
+                                               client.close();
+                                       } catch (final IOException ioe) {
+                                               reportError("Failed to close 
client", ioe);
+                                       }
+                               }
+                       } catch (final IOException ioe) {
+                               restart("Failed to receive data from NiFi", 
ioe);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/nifi-external/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/pom.xml b/nifi/nifi-external/pom.xml
new file mode 100644
index 0000000..878098f
--- /dev/null
+++ b/nifi/nifi-external/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi</artifactId>
+        <version>0.0.2-incubating-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-external</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-spark-receiver</module>
+    </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8506a0ce/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 6297161..5881db7 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -65,6 +65,7 @@
         <module>nifi-assembly</module>
         <module>nifi-docs</module>
         <module>nifi-maven-archetypes</module>
+               <module>nifi-external</module>
     </modules>
     <scm>
         
<connection>scm:git:git://git.apache.org/incubator-nifi.git</connection>

Reply via email to