[FLINK-2740] Adding flink-connector-nifi module with NiFiSource and NiFiSink

This closes #1198


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92fb06a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92fb06a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92fb06a1

Branch: refs/heads/master
Commit: 92fb06a1384cec0e659fcb7da4cd4c88394b27b1
Parents: 5466824
Author: Bryan Bende <bbe...@hw11977.home>
Authored: Wed Sep 30 09:15:44 2015 -0400
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Sun Oct 4 19:17:09 2015 -0500

----------------------------------------------------------------------
 .../flink-connector-nifi/pom.xml                |  94 ++++++++++++
 .../connectors/nifi/NiFiDataPacket.java         |  39 +++++
 .../connectors/nifi/NiFiDataPacketBuilder.java  |  34 +++++
 .../streaming/connectors/nifi/NiFiSink.java     |  74 ++++++++++
 .../streaming/connectors/nifi/NiFiSource.java   | 146 +++++++++++++++++++
 .../connectors/nifi/StandardNiFiDataPacket.java |  46 ++++++
 .../nifi/examples/NiFiSinkTopologyExample.java  |  55 +++++++
 .../examples/NiFiSourceTopologyExample.java     |  58 ++++++++
 .../src/test/resources/NiFi_Flink.xml           |  16 ++
 .../flink-streaming-connectors/pom.xml          |   1 +
 10 files changed, 563 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
new file mode 100644
index 0000000..9168822
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors-parent</artifactId>
+               <version>0.10-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-nifi</artifactId>
+       <name>flink-connector-nifi</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <nifi.version>0.3.0</nifi.version>
+       </properties>
+
+       <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+            <version>${nifi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       
<rerunFailingTestsCount>3</rerunFailingTestsCount>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       
<rerunFailingTestsCount>3</rerunFailingTestsCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
new file mode 100644
index 0000000..c8ceb57
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both
+ * a FlowFile's content and its attributes so that they can be processed by 
Flink.
+ * </p>
+ */
+public interface NiFiDataPacket {
+
+       /**
+        * @return the contents of a NiFi FlowFile
+        */
+       byte[] getContent();
+
+       /**
+        * @return a Map of attributes that are associated with the NiFi 
FlowFile
+        */
+       Map<String, String> getAttributes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000..9bb521b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+
+/**
+ * A function that can create a NiFiDataPacket from an incoming instance of 
the given type.
+ *
+ * @param <T>
+ */
+public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
+
+       NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
new file mode 100644
index 0000000..abc6b35
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+/**
+ * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site 
client. The sink requires
+ * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from 
the incoming data.
+ */
+public class NiFiSink<T> extends RichSinkFunction<T> {
+
+       private SiteToSiteClient client;
+       private SiteToSiteClientConfig clientConfig;
+       private NiFiDataPacketBuilder<T> builder;
+
+       /**
+        * Construct a new NiFiSink with the given client config and 
NiFiDataPacketBuilder.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        * @param builder a builder to produce NiFiDataPackets from incoming 
data
+        */
+       public NiFiSink(SiteToSiteClientConfig clientConfig, 
NiFiDataPacketBuilder<T> builder) {
+               this.clientConfig = clientConfig;
+               this.builder = builder;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+       }
+
+       @Override
+       public void invoke(T value) throws Exception {
+               final NiFiDataPacket niFiDataPacket = 
builder.createNiFiDataPacket(value, getRuntimeContext());
+
+               final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+               if (transaction == null) {
+                       throw new IllegalStateException("Unable to create a 
NiFi Transaction to send data");
+               }
+
+               transaction.send(niFiDataPacket.getContent(), 
niFiDataPacket.getAttributes());
+               transaction.confirm();
+               transaction.complete();
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               client.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
new file mode 100644
index 0000000..a213bb4
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A source that pulls data from Apache NiFi using the NiFi Site-to-Site 
client. This source
+ * produces NiFiDataPackets which encapsulate the content and attributes of a 
NiFi FlowFile.
+ */
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(NiFiSource.class);
+
+       private static final long DEFAULT_WAIT_TIME_MS = 1000;
+
+       private long waitTimeMs;
+       private SiteToSiteClient client;
+       private SiteToSiteClientConfig clientConfig;
+       private transient volatile boolean running;
+
+       /**
+        * Constructs a new NiFiSource using the given client config and the 
default wait time of 1000 ms.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        */
+       public NiFiSource(SiteToSiteClientConfig clientConfig) {
+               this(clientConfig, DEFAULT_WAIT_TIME_MS);
+       }
+
+       /**
+        * Constructs a new NiFiSource using the given client config and wait 
time.
+        *
+        * @param clientConfig the configuration for building a NiFi 
SiteToSiteClient
+        * @param waitTimeMs the amount of time to wait (in milliseconds) if no 
data is available to pull from NiFi
+        */
+       public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) 
{
+               this.clientConfig = clientConfig;
+               this.waitTimeMs = waitTimeMs;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+               running = true;
+       }
+
+       @Override
+       public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
+               try {
+                       while (running) {
+                               final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                               if (transaction == null) {
+                                       LOG.warn("A transaction could not be 
created, waiting and will try again...");
+                                       try {
+                                               Thread.sleep(waitTimeMs);
+                                       } catch (InterruptedException e) {
+
+                                       }
+                                       continue;
+                               }
+
+                               DataPacket dataPacket = transaction.receive();
+                               if (dataPacket == null) {
+                                       transaction.confirm();
+                                       transaction.complete();
+
+                                       LOG.debug("No data available to pull, 
waiting and will try again...");
+                                       try {
+                                               Thread.sleep(waitTimeMs);
+                                       } catch (InterruptedException e) {
+
+                                       }
+                                       continue;
+                               }
+
+                               final List<NiFiDataPacket> niFiDataPackets = 
new ArrayList<>();
+                               do {
+                                       // Read the data into a byte array and 
wrap it along with the attributes
+                                       // into a NiFiDataPacket.
+                                       final InputStream inStream = 
dataPacket.getData();
+                                       final byte[] data = new byte[(int) 
dataPacket.getSize()];
+                                       StreamUtils.fillBuffer(inStream, data);
+
+                                       final Map<String, String> attributes = 
dataPacket.getAttributes();
+
+                                       niFiDataPackets.add(new 
StandardNiFiDataPacket(data, attributes));
+                                       dataPacket = transaction.receive();
+                               } while (dataPacket != null);
+
+                               // Confirm transaction to verify the data
+                               transaction.confirm();
+
+                               for (NiFiDataPacket dp : niFiDataPackets) {
+                                       ctx.collect(dp);
+                               }
+
+                               transaction.complete();
+                       }
+               } finally {
+                       ctx.close();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               client.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
new file mode 100644
index 0000000..5ad4bae
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.nifi;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An implementation of NiFiDataPacket.
+ */
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+       private static final long serialVersionUID = 6364005260220243322L;
+
+       private final byte[] content;
+       private final Map<String, String> attributes;
+
+       public StandardNiFiDataPacket(final byte[] content, final Map<String, 
String> attributes) {
+               this.content = content;
+               this.attributes = attributes;
+       }
+
+       @Override
+       public byte[] getContent() {
+               return content;
+       }
+
+       @Override
+       public Map<String, String> getAttributes() {
+               return attributes;
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
new file mode 100644
index 0000000..572f949
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
+import org.apache.flink.streaming.connectors.nifi.NiFiSink;
+import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.util.HashMap;
+
+/**
+ * An example topology that sends data to a NiFi input port named "Data from 
Flink".
+ */
+public class NiFiSinkTopologyExample {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SiteToSiteClientConfig clientConfig = new 
SiteToSiteClient.Builder()
+                               .url("http://localhost:8080/nifi";)
+                               .portName("Data from Flink")
+                               .buildConfig();
+
+               DataStreamSink<String> dataStream = env.fromElements("one", 
"two", "three", "four", "five", "q")
+                               .addSink(new NiFiSink<>(clientConfig, new 
NiFiDataPacketBuilder<String>() {
+                                       @Override
+                                       public NiFiDataPacket 
createNiFiDataPacket(String s, RuntimeContext ctx) {
+                                               return new 
StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+                                       }
+                               }));
+
+               env.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
new file mode 100644
index 0000000..79c9a1c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.nifi.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
+import org.apache.flink.streaming.connectors.nifi.NiFiSource;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.nio.charset.Charset;
+
+/**
+ * An example topology that pulls data from a NiFi output port named "Data for 
Flink".
+ */
+public class NiFiSourceTopologyExample {
+
+       public static void main(String[] args) throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SiteToSiteClientConfig clientConfig = new 
SiteToSiteClient.Builder()
+                               .url("http://localhost:8080/nifi";)
+                               .portName("Data for Flink")
+                               .requestBatchCount(5)
+                               .buildConfig();
+
+               SourceFunction<NiFiDataPacket> nifiSource = new 
NiFiSource(clientConfig);
+               DataStream<NiFiDataPacket> streamSource = 
env.addSource(nifiSource).setParallelism(2);
+
+               DataStream<String> dataStream = streamSource.map(new 
MapFunction<NiFiDataPacket, String>() {
+                       @Override
+                       public String map(NiFiDataPacket value) throws 
Exception {
+                               return new String(value.getContent(), 
Charset.defaultCharset());
+                       }
+               });
+
+               dataStream.print();
+               env.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
new file mode 100644
index 0000000..d373d63
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 
MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0
 
sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 
old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0
 
sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0
 
MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0
 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 
d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data
 from 
Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data
 for 
Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 
b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File 
Size</key><value><description>The size of the file that will be 
used</description><displayName>File 
Size</displayName><dynamic>false</dynamic><name>File 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch
 Size</key><value><defaultValue>1</defaultValue><description>The number of 
FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch 
Size</displayName><dynamic>false</dynamic><name>Batch 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data
 
Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies
 whether the data should be Text or Binary</description><displayName>Data 
Format</displayName><dynamic>false</dynamic><name>Data 
Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique
 
FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If
 true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be 
generated and all FlowFiles will get the same content but this offers much 
higher throughput</description><displayName>Unique 
FlowFiles</displayName><dynamic>false</dynamic><name>Unique 
FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 
b</value></entry><entry><key>Batch 
Size</key><value>1</value></entry><entry><key>Data 
Format</key><value>Binary</value></entry><entry><key>Unique 
FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 
cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log 
Level</key><value><al
 
lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The
 Log Level to use when logging the Attributes</description><displayName>Log 
Level</displayName><dynamic>false</dynamic><name>Log 
Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log
 
Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If
 true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the 
Attributes will be logged.</description><displayName>Log 
Payload</displayName><dynamic>false</dynamic><name>Log 
Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes
 to Log</key><value><description>A comma-separated list of Attributes to Log. 
If not specified, all attributes will be 
logged.</description><displayName>Attributes to 
Log</displayName><dynamic>false</dynamic><name>Attributes to 
Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes
 to Ignore</key><value><description>A comma-separated list of Attributes to 
ignore. If not specified, no attributes will be 
ignored.</description><displayName>Attributes to 
Ignore</displayName><dynamic>false</dynamic><name>Attributes to 
Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log 
prefix</key><value><description>Log prefix appended to the log lines. It helps 
to distinguish the output of multiple LogAttribute 
processors.</description><displayName>Log 
prefix</displayName><dynamic>false</dynamic><name>Log 
prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>Log 
Level</key></entry><entry><key>Log 
Payload</key><value>true</value></entry><entry><key>Attributes to 
Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log 
prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All
 FlowFil
 es are routed to this 
relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0
 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * 
?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular 
Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The 
Regular Expression to search for in the FlowFile 
content</description><displayName>Regular 
Expression</displayName><dynamic>false</dynamic><name>Regular 
Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement
 Value</key><value><defaultValue>$1</defaultValue><description>The value to 
replace the regular expression with. Back-references to Regular Expression 
capturing groups are supported, but back-references that reference capturing 
groups that do not exist in the regular expression will be treated as literal 
value.</description><displayName>Replacement 
Value</displayName><dynamic>false</dynamic><name>Replacement 
Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character
 Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is 
encoded</description><displayName>Character 
Set</displayName><dynamic>false</dynamic><name>Character 
Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum
 Buffer Size</key><value><defaultValue>1 
MB</defaultValue><description>Specifies the maximum amount of data to buffer 
(per file or per line, depending on the Evaluation Mode) in order to apply the 
regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the 
FlowFile is larger than this value, the FlowFile will be routed to 'failure'. 
In 'Line-by-Line' Mode, if a single line is larger than this value, the 
FlowFile will be routed to 'failure'. A default value of 1 MB is provided, 
primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB 
or 16 KB is suggested. This value is ignored and the buffer is not used if 
'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer 
Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation
 
Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire
 text</displayName><value>Entire 
text</value></allowableValues><defaultValue>Entire 
text</defaultValue><description>Evaluate the 'Regular Expression' against each 
line (Line-by-Line) or buffer the entire file into memory (Entire Text) and 
then evaluate the 'Regular Expression'.</description><displayName>Evaluation 
Mode</displayName><dynamic>false</dynamic><name>Evaluation 
Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30
 sec</penaltyDuration><properties><entry><key>Regular 
Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah 
blah</value></entry><entry><key>Character 
Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer 
Size</key><value>1 MB</value></entry><entry><key>Evaluation 
Mode</key><value>Entire 
text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0
 
sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1
 
sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles
 that could not be updated are routed to this 
relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles
 that have been successfully updated are routed to this relationship, as well 
as FlowFiles whose content does not match the given Regular 
Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 
EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015
 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/92fb06a1/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml 
b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 142d7c9..822ca26 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -41,6 +41,7 @@ under the License.
                <module>flink-connector-elasticsearch</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
+               <module>flink-connector-nifi</module>
        </modules>
 
        <!-- See main pom.xml for explanation of profiles -->

Reply via email to