Repository: nifi
Updated Branches:
  refs/heads/master 7a165b62c -> 56ad22aea


NIFI-900: Created Processors for interacting with Microsoft Azure EventHubs

Reviewed (with amendments needed for clean merge, whitespace and NOTICEs) by 
Tony Kurc (tk...@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/56ad22ae
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/56ad22ae
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/56ad22ae

Branch: refs/heads/master
Commit: 56ad22aea6db7d401a72857b1356dfba85f274b6
Parents: 7a165b6
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Nov 11 23:06:04 2015 -0500
Committer: Tony Kurc <trk...@gmail.com>
Committed: Wed Nov 11 23:06:04 2015 -0500

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |   7 +
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-azure-bundle/nifi-azure-nar/pom.xml    |  37 +++
 .../src/main/resources/META-INF/NOTICE          |  34 +++
 .../nifi-azure-processors/pom.xml               |  60 +++++
 .../azure/eventhub/GetAzureEventHub.java        | 254 +++++++++++++++++++
 .../azure/eventhub/PutAzureEventHub.java        | 186 ++++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 ++
 nifi-nar-bundles/nifi-azure-bundle/pom.xml      |  35 +++
 nifi-nar-bundles/pom.xml                        |   7 +-
 pom.xml                                         |   6 +
 11 files changed, 644 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 02c9d7f..5e18035 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -765,6 +765,13 @@ The following binary components are provided under the 
Apache Software License v
         is a distributed tracing system that is Apache 2.0 Licensed.
         Copyright 2012 Twitter, Inc.
 
+    (ASLv2) Apache Qpid AMQP 1.0 Client
+      The following NOTICE information applies:
+        Copyright 2006-2015 The Apache Software Foundation
+
+    (ASLv2) EventHubs Client 
(com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - 
https://github.com/hdinsight/eventhubs-client/)
+
+
 ************************
 Common Development and Distribution License 1.1
 ************************

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 8e4a175..961349f 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -242,6 +242,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
new file mode 100644
index 0000000..26d79d2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-azure-nar</artifactId>
+    <version>0.4.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-processors</artifactId>
+            <version>0.4.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..1d5a375
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,34 @@
+nifi-azure-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2014 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Qpid AMQP 1.0 Client
+    The following NOTICE information applies:
+      Copyright 2006-2015 The Apache Software Foundation
+
+  (ASLv2) EventHubs Client 
(com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - 
https://github.com/hdinsight/eventhubs-client/)
+
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and 
Distribution License 1.1. See project link for details.
+
+    (CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 
- http://java.net/projects/jms-spec/pages/Home)

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
new file mode 100644
index 0000000..c34161c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-azure-bundle</artifactId>
+               <version>0.4.0-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>nifi-azure-processors</artifactId>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-processor-utils</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>javax.jms</groupId>
+                       <artifactId>javax.jms-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>com.microsoft.eventhubs.client</groupId>
+                       <artifactId>eventhubs-client</artifactId>
+                       <version>0.9.1</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-simple</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
new file mode 100644
index 0000000..2c62df6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import com.microsoft.eventhubs.client.ConnectionStringBuilder;
+import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubMessage;
+import com.microsoft.eventhubs.client.IEventHubFilter;
+import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
+
+@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", 
"streams" })
+@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, 
writing the contents of the Azure message to the content of the FlowFile")
+@WritesAttributes({
+    @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = 
"The time (in milliseconds since epoch, UTC) at which the message was enqueued 
in the Azure Event Hub"),
+    @WritesAttribute(attribute = "eventhub.offset", description = "The offset 
into the partition at which the message was stored"),
+    @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure 
Sequence number associated with the message"),
+    @WritesAttribute(attribute = "eventhub.name", description = "The name of 
the Event Hub from which the message was pulled"),
+    @WritesAttribute(attribute = "eventhub.partition", description = "The name 
of the Azure Partition from which the message was pulled")
+})
+public class GetAzureEventHub extends AbstractProcessor {
+
+    static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
+        .name("Event Hub Name")
+        .description("The name of the Azure Event Hub to pull messages from")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .build();
+    static final PropertyDescriptor NAMESPACE = new 
PropertyDescriptor.Builder()
+        .name("Event Hub Namespace")
+        .description("The Azure Namespace that the Event Hub is assigned to. 
This is generally equal to <Event Hub Name>-ns")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
+        .name("Shared Access Policy Name")
+        .description("The name of the Event Hub Shared Access Policy. This 
Policy must have Listen permissions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor POLICY_PRIMARY_KEY = new 
PropertyDescriptor.Builder()
+        .name("Shared Access Policy Primary Key")
+        .description("The primary key of the Event Hub Shared Access Policy")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .sensitive(true)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor NUM_PARTITIONS = new 
PropertyDescriptor.Builder()
+        .name("Number of Event Hub Partitions")
+        .description("The number of partitions that the Event Hub has. Only 
this number of partitions will be used, "
+            + "so it is important to ensure that if the number of partitions 
changes that this value be updated. Otherwise, some messages may not be 
consumed.")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor CONSUMER_GROUP = new 
PropertyDescriptor.Builder()
+        .name("Event Hub Consumer Group")
+        .description("The name of the Event Hub Consumer Group to use when 
pulling events")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("$Default")
+        .required(true)
+        .build();
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("The number of FlowFiles to pull in a single JMS session")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("10")
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully received from the 
Azure Event Hub will be transferred to this Relationship.")
+        .build();
+
+    private final ConcurrentMap<String, ResilientEventHubReceiver> 
partitionToReceiverMap = new ConcurrentHashMap<>();
+    private volatile BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(EVENT_HUB_NAME);
+        properties.add(NAMESPACE);
+        properties.add(ACCESS_POLICY);
+        properties.add(POLICY_PRIMARY_KEY);
+        properties.add(NUM_PARTITIONS);
+        properties.add(CONSUMER_GROUP);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    private ResilientEventHubReceiver getReceiver(final ProcessContext 
context, final String partitionId) throws EventHubException {
+        ResilientEventHubReceiver existingReceiver = 
partitionToReceiverMap.get(partitionId);
+        if (existingReceiver != null) {
+            return existingReceiver;
+        }
+
+        // we want to avoid allowing multiple threads to create Receivers 
simultaneously because that could result in
+        // having multiple Receivers for the same partition. So if the map 
does not contain a receiver for this partition,
+        // we will enter a synchronized block and check again (because once we 
enter the synchronized block, we know that no
+        // other thread is creating a client). If within the synchronized 
block, we still do not have an entry in the map,
+        // it is up to use to create the receiver, initialize it, and then put 
it into the map.
+        // We do not use the putIfAbsent method in order to do a CAS operation 
here because we want to also initialize the
+        // receiver if and only if it is not present in the map. As a result, 
we need to initialize the receiver and add it
+        // to the map atomically. Hence, the synchronized block.
+        synchronized (this) {
+            existingReceiver = partitionToReceiverMap.get(partitionId);
+            if (existingReceiver != null) {
+                return existingReceiver;
+            }
+
+            final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
+            final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+            final String namespace = context.getProperty(NAMESPACE).getValue();
+            final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+            final String consumerGroupName = 
context.getProperty(CONSUMER_GROUP).getValue();
+
+            final String connectionString = new 
ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString();
+            final IEventHubFilter filter = new 
EventHubEnqueueTimeFilter(System.currentTimeMillis());
+            final ResilientEventHubReceiver receiver = new 
ResilientEventHubReceiver(connectionString, eventHubName, partitionId, 
consumerGroupName, -1, filter);
+            receiver.initialize();
+
+            partitionToReceiverMap.put(partitionId, receiver);
+            return receiver;
+        }
+    }
+
+
+    @OnStopped
+    public void tearDown() {
+        for (final ResilientEventHubReceiver receiver : 
partitionToReceiverMap.values()) {
+            receiver.close();
+        }
+
+        partitionToReceiverMap.clear();
+    }
+
+    @OnScheduled
+    public void setupPartitions(final ProcessContext context) {
+        final BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
+        for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); 
i++) {
+            partitionNames.add(String.valueOf(i));
+        }
+        this.partitionNames = partitionNames;
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final BlockingQueue<String> partitionIds = this.partitionNames;
+        final String partitionId = partitionIds.poll();
+        if (partitionId == null) {
+            getLogger().debug("No partitions available");
+            return;
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+        try {
+            final ResilientEventHubReceiver receiver;
+            try {
+                receiver = getReceiver(context, partitionId);
+            } catch (final EventHubException e) {
+                throw new ProcessException(e);
+            }
+
+            final EventHubMessage message = 
EventHubMessage.parseAmqpMessage(receiver.receive(100L));
+            if (message == null) {
+                return;
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("eventhub.enqueued.timestamp", 
String.valueOf(message.getEnqueuedTimestamp()));
+            attributes.put("eventhub.offset", message.getOffset());
+            attributes.put("eventhub.sequence", 
String.valueOf(message.getSequence()));
+            attributes.put("eventhub.name", 
context.getProperty(EVENT_HUB_NAME).getValue());
+            attributes.put("eventhub.partition", partitionId);
+
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    out.write(message.getData());
+                }
+            });
+
+            session.transfer(flowFile, REL_SUCCESS);
+
+            final String namespace = context.getProperty(NAMESPACE).getValue();
+            final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+            final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).getValue();
+            final String transitUri = "amqps://" + namespace + 
".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + 
consumerGroup + "/Partitions/" + partitionId;
+            session.getProvenanceReporter().receive(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        } finally {
+            partitionIds.offer(partitionId);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
new file mode 100644
index 0000000..82fff27
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+
+import com.microsoft.eventhubs.client.EventHubClient;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubSender;
+
+
+@SupportsBatching
+@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", 
"streaming" })
+@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure 
Event Hub. Note: the content of the FlowFile will be buffered into memory 
before being sent, "
+    + "so care should be taken to avoid sending FlowFiles to this Processor 
that exceed the amount of Java Heap Space available.")
+public class PutAzureEventHub extends AbstractProcessor {
+
+    static final AllowableValue DELIVERY_MODE_PERSISTENT = new 
AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This 
mode indicates that the Event Hub "
+        + "server must persist the message to a reliable storage mechanism 
before the FlowFile is routed to 'success', in order to ensure that the data is 
not lost.");
+
+    static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new 
AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent",
+        "This mode indicates that the Event Hub server does not have to 
persist the message to a reliable storage mechanism before the FlowFile is 
routed to 'success'. "
+            + "This delivery mode may offer higher throughput but may result 
in message loss if the server crashes or is restarted.");
+
+    static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
+        .name("Event Hub Name")
+        .description("The name of the Azure Event Hub to send to")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .build();
+    static final PropertyDescriptor NAMESPACE = new 
PropertyDescriptor.Builder()
+        .name("Event Hub Namespace")
+        .description("The Azure Namespace that the Event Hub is assigned to. 
This is generally equal to <Event Hub Name>-ns")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor ACCESS_POLICY = new 
PropertyDescriptor.Builder()
+        .name("Shared Access Policy Name")
+        .description("The name of the Event Hub Shared Access Policy. This 
Policy must have Send permissions.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor POLICY_PRIMARY_KEY = new 
PropertyDescriptor.Builder()
+        .name("Shared Access Policy Primary Key")
+        .description("The primary key of the Event Hub Shared Access Policy")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .sensitive(true)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully sent to the Azure 
Event Hub will be transferred to this Relationship.")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that could not be sent to the Azure Event 
Hub will be transferred to this Relationship.")
+        .build();
+
+    private volatile BlockingQueue<EventHubSender> senderQueue = new 
LinkedBlockingQueue<>();
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(EVENT_HUB_NAME);
+        properties.add(NAMESPACE);
+        properties.add(ACCESS_POLICY);
+        properties.add(POLICY_PRIMARY_KEY);
+        return properties;
+    }
+
+
+    @OnScheduled
+    public final void setupClient(final ProcessContext context) throws 
EventHubException {
+        final String policyName = 
context.getProperty(ACCESS_POLICY).getValue();
+        final String policyKey = 
context.getProperty(POLICY_PRIMARY_KEY).getValue();
+        final String namespace = context.getProperty(NAMESPACE).getValue();
+        final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+
+        final EventHubClient client = EventHubClient.create(policyName, 
policyKey, namespace, eventHubName);
+
+        final int numThreads = context.getMaxConcurrentTasks();
+        senderQueue = new LinkedBlockingQueue<>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            final EventHubSender sender = client.createPartitionSender(null);
+            senderQueue.offer(sender);
+        }
+    }
+
+    @OnStopped
+    public void tearDown() {
+        EventHubSender sender;
+        while ((sender = senderQueue.poll()) != null) {
+            sender.close();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+        final EventHubSender sender = senderQueue.poll();
+        try {
+            final byte[] buffer = new byte[(int) flowFile.getSize()];
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, buffer);
+                }
+            });
+
+            try {
+                sender.send(buffer);
+            } catch (final EventHubException ehe) {
+                getLogger().error("Failed to send {} to EventHub due to {}; 
routing to failure", new Object[] { flowFile, ehe }, ehe);
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }
+
+            final String namespace = context.getProperty(NAMESPACE).getValue();
+            final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
+            session.getProvenanceReporter().send(flowFile, "amqps://" + 
namespace + ".servicebus.windows.net" + "/" + eventHubName, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } finally {
+            senderQueue.offer(sender);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..178e52c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
+org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/nifi-azure-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
new file mode 100644
index 0000000..3978748
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-azure-bundle</artifactId>
+    <version>0.4.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-azure-processors</module>
+        <module>nifi-azure-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 510935d..4c0925f 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -42,12 +42,13 @@
         <module>nifi-language-translation-bundle</module>
         <module>nifi-mongodb-bundle</module>
         <module>nifi-flume-bundle</module>
-               <module>nifi-hbase-bundle</module>
+       <module>nifi-hbase-bundle</module>
         <module>nifi-ambari-bundle</module>
         <module>nifi-image-bundle</module>
         <module>nifi-avro-bundle</module>
         <module>nifi-couchbase-bundle</module>
-  </modules>
+        <module>nifi-azure-bundle</module>
+    </modules>
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -132,4 +133,4 @@
             </dependency>
         </dependencies>
     </dependencyManagement>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/56ad22ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cf6f018..8a8cdb0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -933,6 +933,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-azure-nar</artifactId>
+                <version>0.4.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>0.4.0-SNAPSHOT</version>
             </dependency>

Reply via email to