Re: Apache nifi Information

2016-01-21 Thread Joe Witt
Yashwant

Thank you for your interest in Apache NiFi.  From the questions you
have at this stage it sounds like you are in the very early stages of
learning about NiFi.  Please take a look at the following starting
points:

NiFi Website [1]
NiFi Documentation [2]
NiFi Videos/Talks [3]

[1] https://nifi.apache.org/
[2] https://nifi.apache.org/docs.html
[3] https://nifi.apache.org/videos.html

Thanks
Joe

On Thu, Jan 21, 2016 at 8:28 AM, Yashwant Chandrakar
 wrote:
> Hi Team,
>
>
> I am Yashwant Chandrakar currently working at  Brillio as big data consultant.
> I am looking forward to use apache nifi to understand the data flow purpose.
>
> Could you please help me out to understand the following below points regards 
> with nifi.
>
>
> 1.   How I can use the nifi to understand the data flow.
>
> 2.   Is there any document available which can help to understand the  
> nifi.
>
> 3.   Is there any kind of integration of nifi with Hadoop ecosystem is 
> available.
>
> 4.   How to explorer all the possible option that we can use for nifi.
>
>
>
>
> Thanks & Regards,
> Yashwant Chandrakar
> This e-mail contains Privileged and Confidential Information intended solely 
> for the use of the addressee(s). It shall not attach any liability on the 
> sender or Brillio or its affiliates. Any views or opinions presented in this 
> email are solely those of the sender and may not necessarily reflect the 
> opinions of Brillio or its affiliates. If you are not the intended recipient, 
> you should not disseminate, distribute or copy this e-mail. Please notify the 
> sender immediately and destroy all copies of this message and any 
> attachments. WARNING: Computer viruses can be transmitted via email. While 
> Brillio has taken reasonable precautions to minimize this risk, Brillio 
> accepts no liability for any damage that may be caused to you in the event 
> that there is any virus in this e-mail or any attachments attached hereto. It 
> is the addresses(s) duty to check and scan this email and any attachments 
> attached hereto for the presence of viruses prior to opening the email. ** 
> Thank You **


Apache nifi Information

2016-01-21 Thread Yashwant Chandrakar
Hi Team,


I am Yashwant Chandrakar currently working at  Brillio as big data consultant.
I am looking forward to use apache nifi to understand the data flow purpose.

Could you please help me out to understand the following below points regards 
with nifi.


1.   How I can use the nifi to understand the data flow.

2.   Is there any document available which can help to understand the  nifi.

3.   Is there any kind of integration of nifi with Hadoop ecosystem is 
available.

4.   How to explorer all the possible option that we can use for nifi.




Thanks & Regards,
Yashwant Chandrakar
This e-mail contains Privileged and Confidential Information intended solely 
for the use of the addressee(s). It shall not attach any liability on the 
sender or Brillio or its affiliates. Any views or opinions presented in this 
email are solely those of the sender and may not necessarily reflect the 
opinions of Brillio or its affiliates. If you are not the intended recipient, 
you should not disseminate, distribute or copy this e-mail. Please notify the 
sender immediately and destroy all copies of this message and any attachments. 
WARNING: Computer viruses can be transmitted via email. While Brillio has taken 
reasonable precautions to minimize this risk, Brillio accepts no liability for 
any damage that may be caused to you in the event that there is any virus in 
this e-mail or any attachments attached hereto. It is the addresses(s) duty to 
check and scan this email and any attachments attached hereto for the presence 
of viruses prior to opening the email. ** Thank You **


[GitHub] nifi pull request: NIFI-1378 fixed JMS URI validation

2016-01-21 Thread olegz
Github user olegz commented on the pull request:

https://github.com/apache/nifi/pull/167#issuecomment-173604806
  
@trkurc I do see your point, so let me throw something else at you. The 
_URI_VALIDATOR_ simply validates based on success of this ```new URI(input);``` 
and we can certainly add the test back, but consider that that test was added 
by Joe to validate the previous fix that we all seem to agree was inefficient 
given what we know now. So instead I've added 3 more tests to test specific 
invalid conditions. 
That leaves us with one thing and that is testing that 'any validator' 
works in the processor and as I said before I think those tests already exists 
at the higher level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Give extra info about input dir for Windows use...

2016-01-21 Thread thadguidry
GitHub user thadguidry opened a pull request:

https://github.com/apache/nifi/pull/181

Give extra info about input dir for Windows users.

Windows users typically don't have to worry about supplying a trailing 
slashuntil they work with some Java programs like Nifi.
An input path such as 
"E:\git\nifi\nifi-nar-bundles\nifi-standard-bundle\nifi-standard-processors\src\test\resources\CharacterSetConversionSamples\TestInput"
 that does not have a trailing slash will cause NiFi to parse everything in 
CharacterSetConversionSamplesnot the TestInput folder.  To remedy the 
situation, Windows users just need to add the trailing slash \ such as 
"E:\git\nifi\nifi-nar-bundles\nifi-standard-bundle\nifi-standard-processors\src\test\resources\CharacterSetConversionSamples\TestInput\"
 and then NiFi is able to use the correct path and not pickup extra files from 
the unexpected parent folder.

Giving this extra hint for Windows users is an easier workaround for now 
than doing Windows OS detection with 
http://commons.apache.org/proper/commons-lang/javadocs/api-release/org/apache/commons/lang3/SystemUtils.html#IS_OS_WINDOWS
 and providing workarounds.

I think most Windows users and developers are aware of this kind of issue 
that we deal with everyday, and the extra hint should be enough.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thadguidry/nifi patch-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #181


commit d775514d163f10233a599f3adf1f8b59a2672b5a
Author: Thad Guidry 
Date:   2016-01-21T15:56:33Z

Give extra info about input dir for Windows users.

Windows users typically don't have to worry about supplying a trailing 
slashuntil they work with some Java programs like Nifi.
An input path such as 
"E:\git\nifi\nifi-nar-bundles\nifi-standard-bundle\nifi-standard-processors\src\test\resources\CharacterSetConversionSamples\TestInput"
 that does not have a trailing slash will cause NiFi to parse everything in 
CharacterSetConversionSamplesnot the TestInput folder.  To remedy the 
situation, Windows users just need to add the trailing slash \ such as 
"E:\git\nifi\nifi-nar-bundles\nifi-standard-bundle\nifi-standard-processors\src\test\resources\CharacterSetConversionSamples\TestInput\"
 and then NiFi is able to use the correct path and not pickup extra files from 
the unexpected parent folder.

Giving this extra hint for Windows users is an easier workaround for now 
than doing Windows OS detection with 
http://commons.apache.org/proper/commons-lang/javadocs/api-release/org/apache/commons/lang3/SystemUtils.html#IS_OS_WINDOWS
 and providing workarounds.

I think most Windows users and developers are aware of this kind of issue 
that we deal with everyday, and the extra hint should be enough.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Unable to pick different input ports in the RemoteProcessorGroup

2016-01-21 Thread Paresh Shah
To clarify I do see the ports in the RPG but am not able to turn them
“On”. That option is only available for the first in the list.

Paresh

On 1/21/16, 9:55 AM, "Paresh Shah"  wrote:

>I¹m not sure where I would see the ports being called remote group ports.
>Using the UI I picked the Input Port on to the canvas at the root level
>and am able to see one of them successfully in the RemoteProcessorGroup.
>Also I do not see any place where I can set the authorization level.
>
>
>
>On 1/21/16, 9:50 AM, "Joe Witt"  wrote:
>
>>Paresh,
>>
>>Screenshots through the apache mailing lists require pretty specific
>>delivery mechanism which honestly I don't know.  So...best to find a
>>way to link to screenshots or to put them in a JIRA.
>>
>>When you create InputPorts at the root group level they become
>>available as ports that another authorized system can see.  If they
>>are not at the root group level or if they are not authorized to be
>>seen by the other system they will not show up in the list.
>>
>>Can you confirm that these are remote group ports?
>>
>>Thanks
>>Joe
>>
>>On Thu, Jan 21, 2016 at 12:48 PM, Paresh Shah 
>>wrote:
>>> I have created different set of InputPorts for the different pipelines.
>>>But
>>> when I see the list of InputPorts in the RPG I do not see option to
>>>pick
>>> other ports except the first one in the list.
>>>
>>> I have attached the screeshots
>>>
>>>
>>> 
>>> The information contained in this transmission may contain privileged
>>>and
>>> confidential information. It is intended only for the use of the
>>>person(s)
>>> named above. If you are not the intended recipient, you are hereby
>>>notified
>>> that any review, dissemination, distribution or duplication of this
>>> communication is strictly prohibited. If you are not the intended
>>>recipient,
>>> please contact the sender by reply email and destroy all copies of the
>>> original message.
>>> 
>


 The information contained in this transmission may contain privileged and 
confidential information. It is intended only for the use of the person(s) 
named above. If you are not the intended recipient, you are hereby notified 
that any review, dissemination, distribution or duplication of this 
communication is strictly prohibited. If you are not the intended recipient, 
please contact the sender by reply email and destroy all copies of the original 
message.



Re: Unable to pick different input ports in the RemoteProcessorGroup

2016-01-21 Thread Paresh Shah
I¹m not sure where I would see the ports being called remote group ports.
Using the UI I picked the Input Port on to the canvas at the root level
and am able to see one of them successfully in the RemoteProcessorGroup.
Also I do not see any place where I can set the authorization level.



On 1/21/16, 9:50 AM, "Joe Witt"  wrote:

>Paresh,
>
>Screenshots through the apache mailing lists require pretty specific
>delivery mechanism which honestly I don't know.  So...best to find a
>way to link to screenshots or to put them in a JIRA.
>
>When you create InputPorts at the root group level they become
>available as ports that another authorized system can see.  If they
>are not at the root group level or if they are not authorized to be
>seen by the other system they will not show up in the list.
>
>Can you confirm that these are remote group ports?
>
>Thanks
>Joe
>
>On Thu, Jan 21, 2016 at 12:48 PM, Paresh Shah 
>wrote:
>> I have created different set of InputPorts for the different pipelines.
>>But
>> when I see the list of InputPorts in the RPG I do not see option to pick
>> other ports except the first one in the list.
>>
>> I have attached the screeshots
>>
>>
>> 
>> The information contained in this transmission may contain privileged
>>and
>> confidential information. It is intended only for the use of the
>>person(s)
>> named above. If you are not the intended recipient, you are hereby
>>notified
>> that any review, dissemination, distribution or duplication of this
>> communication is strictly prohibited. If you are not the intended
>>recipient,
>> please contact the sender by reply email and destroy all copies of the
>> original message.
>> 


 The information contained in this transmission may contain privileged and 
confidential information. It is intended only for the use of the person(s) 
named above. If you are not the intended recipient, you are hereby notified 
that any review, dissemination, distribution or duplication of this 
communication is strictly prohibited. If you are not the intended recipient, 
please contact the sender by reply email and destroy all copies of the original 
message.



Re: Unable to pick different input ports in the RemoteProcessorGroup

2016-01-21 Thread Paresh Shah
I did not see the option to pick the port when dragging the RPG. This is
no longer a problem.


On 1/21/16, 10:10 AM, "Paresh Shah"  wrote:

>To clarify I do see the ports in the RPG but am not able to turn them
>“On”. That option is only available for the first in the list.
>
>Paresh
>
>On 1/21/16, 9:55 AM, "Paresh Shah"  wrote:
>
>>I¹m not sure where I would see the ports being called remote group ports.
>>Using the UI I picked the Input Port on to the canvas at the root level
>>and am able to see one of them successfully in the RemoteProcessorGroup.
>>Also I do not see any place where I can set the authorization level.
>>
>>
>>
>>On 1/21/16, 9:50 AM, "Joe Witt"  wrote:
>>
>>>Paresh,
>>>
>>>Screenshots through the apache mailing lists require pretty specific
>>>delivery mechanism which honestly I don't know.  So...best to find a
>>>way to link to screenshots or to put them in a JIRA.
>>>
>>>When you create InputPorts at the root group level they become
>>>available as ports that another authorized system can see.  If they
>>>are not at the root group level or if they are not authorized to be
>>>seen by the other system they will not show up in the list.
>>>
>>>Can you confirm that these are remote group ports?
>>>
>>>Thanks
>>>Joe
>>>
>>>On Thu, Jan 21, 2016 at 12:48 PM, Paresh Shah 
>>>wrote:
 I have created different set of InputPorts for the different
pipelines.
But
 when I see the list of InputPorts in the RPG I do not see option to
pick
 other ports except the first one in the list.

 I have attached the screeshots


 
 The information contained in this transmission may contain privileged
and
 confidential information. It is intended only for the use of the
person(s)
 named above. If you are not the intended recipient, you are hereby
notified
 that any review, dissemination, distribution or duplication of this
 communication is strictly prohibited. If you are not the intended
recipient,
 please contact the sender by reply email and destroy all copies of the
 original message.
 
>>
>
>
> The information contained in this transmission may contain privileged
>and confidential information. It is intended only for the use of the
>person(s) named above. If you are not the intended recipient, you are
>hereby notified that any review, dissemination, distribution or
>duplication of this communication is strictly prohibited. If you are not
>the intended recipient, please contact the sender by reply email and
>destroy all copies of the original message.
>


 The information contained in this transmission may contain privileged and 
confidential information. It is intended only for the use of the person(s) 
named above. If you are not the intended recipient, you are hereby notified 
that any review, dissemination, distribution or duplication of this 
communication is strictly prohibited. If you are not the intended recipient, 
please contact the sender by reply email and destroy all copies of the original 
message.



[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread mcgilman
Github user mcgilman commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50451076
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the Datagram channel into an available buffer. If data is 
read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+public class DatagramChannelDispatcher> 
implements ChannelDispatcher {
+
+private final EventFactory eventFactory;
+private final BlockingQueue bufferPool;
+private final BlockingQueue events;
+private final ProcessorLog logger;
+
+private Selector selector;
+private DatagramChannel datagramChannel;
+private volatile boolean stopped = false;
+
+public DatagramChannelDispatcher(final EventFactory eventFactory,
+ final BlockingQueue 
bufferPool,
+ final BlockingQueue events,
+ final ProcessorLog logger) {
+this.eventFactory = eventFactory;
+this.bufferPool = bufferPool;
+this.events = events;
+this.logger = logger;
+}
+
+@Override
+public void open(final int port, int maxBufferSize) throws IOException 
{
+datagramChannel = DatagramChannel.open();
+datagramChannel.configureBlocking(false);
+if (maxBufferSize > 0) {
+datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+if (actualReceiveBufSize < maxBufferSize) {
+logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
++ actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
++ "maximum receive buffer");
+}
+}
+datagramChannel.socket().bind(new InetSocketAddress(port));
+selector = Selector.open();
+datagramChannel.register(selector, SelectionKey.OP_READ);
+}
+
+@Override
+public void run() {
+final ByteBuffer buffer = bufferPool.poll();
--- End diff --

Protect when buffer is null or ensure bufferPool is not empty initially?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: How to configure/start multiple Input ports.

2016-01-21 Thread Matthew Clarke
Paresh,

  I am really not sure why you are not getting by attachments.  Other
are receiving them fine. Is it possible the LifeLock email servers are
dropping them?

  I think we need to take one step back and talk a little more about
what input and output ports are used for.

  Lets start by talking about the NiFi canvas where you are creating
your dataflows.  When you started NiFi for the very first time you were
given a blank canvas.  You can think of this canvas as the root process
group.  From there you were able to add additional process groups to that
top level canvas. These added process groups allowed you drill down in to
them giving additional blank canvases you could build dataflows on.  When
you enter an added process group you will see the hierarchy represented
just above the canvas in the UI  ( flowname >> processgroupname ). NiFi
does not restrict the number of process groups your create or the depth you
can go with them. You could compare the process group hierarchy to that of
a Windows directory structure. So if i added another process group inside
one tat i already created, i have essentially now gone two layers deeper.
 ( flowname >> processgroupname >> processgroupdeeper ).  The hierarchy
represented above you canvas allows you to quickly jump up one or more
layers all the way to the root level.

 Now that we understand process groups, lets talk about how we move
data in and out of these process groups.  This is where input and output
ports come in to play.  Input and output ports exist to move FlowFIles
between a process group and *ONE LEVEL UP *from that process group. Input
ports will accept FlowFiles coming from one level up and output ports allow
FlowFiles to be sent one level up.  If I have a process group added to my
canvas, I cannot drag a connection to it until at least one input port
exists inside that process group. I also cannot drag a connection off of
that process group until at least on output port exists inside the process
group. You can only move FlowFiles up or down one level at a time. Given
the example of a process group within another process group, FlowFiles
would need to be moved from the deepest level up to the middle layer before
finally being able to be moved to the root canvas.

 Site-to-Site behaves and is configured very much in the same way.
Instead of moving FlowFiles between different process groups (layers)
within the same NiFi, we are moving FlowFiles between different NiFi
instances or clusters.  As I explained earlier, input and output ports are
used to move data to and from one level up.  At the top level of your
canvas (root process group level) adding input or output ports provides the
ability for that NiFi to receive (input port) FlowFiles from another NiFi
instance or have a another NiFI pull files from (output port) that NiFi.
We refer to this ability to send FlowFiles between different NiFi instances
as Site-to-Site. We refer to input and output ports added the top level as
remote input or output ports. These Remote Process group are not configured
with unique system port numbers, but instead all utilize the same
Site-To-Site port number configured in your nifi.properties files.  They
are added the same way as you would add any other input port they just look
a little different when added at the top level. Their configuration has two
tabs instead of one. Now that you have your receiving NiFi configured with
input ports and/or output ports. you need to configure another NiFi to send
FlowFile to or pull FlowFiles from these ports.

  In single instance you can send data to an input port inside a
process group by dragging a connection to the process group and selecting
the name of the input port from a selection menu provided.  However on your
source NiFi you need to add a Remote Process Group (RPG) that uses the URL
for the target NiFi  (NCM URL if target is a NiFI cluster).  Once the
initial communications has occurred, you can drag a connection to the RPG
much in the same way you previously moved FlowFiles between process groups
(layers) within a single instance of NiFi.   You can also hover over the
RPG and drag a connection off of the RPG which will allow you to pull data
from an available output port on the target NiFi.

  Remember that the Source NiFi (standalone or cluster) has one or more
RPGs while the target NiFi contains the input and output ports (Input ports
added to lop level).  In your setup it sounds like all your dataflows on
the target NiFis exist inside process groups.  So  you would need to take
data received on your top level input ports and connect it into your
process groups.

Thanks,
Matt



On Wed, Jan 20, 2016 at 9:11 PM, Paresh Shah 
wrote:

> Another question.
>
> To use the Remote Processor Group, on the sender side should we do the
> following.
>
>   1.  Create Output Port.
>   2.  Create Remote Processor Group using the OutputPort created.
>
> Or
>
> 1. Create 

Re: How to configure/start multiple Input ports.

2016-01-21 Thread Matthew Clarke
For Site-to-Site to work between NiFi instances, the following need to be
in place:

1. Firewalls have to be open to allow all source nodes to talk to all
destination nodes through the Site-To-Site port configured in the target
nodes nifi.properties file.
2. Firewalls have to be open to allow all source NiFi Nodes and NCM to talk
to HTTP(s) port of the target NCM.
3. In most cases it is necessary to have the nifi.remote.input.socket.host=
configured on every target node/NCM with a hostname that is resolveable by
the source NiFi nodes/NCM.  If left blank the target nodes will use java to
try to determine the hostname.  Sometimes this can result in localhost
being sent which will not work.  iT may also return a local hostname which
the source NiFi Nodes can not resolve.

When setting up your Nifi instances (Nodes or NCMs), it is highly
recommended that the following optional properties are populated:

# Site to Site properties
nifi.remote.input.socket.host=

 web properties #
nifi.web.http.host=   (if running non secure)
nifi.web.https.host=   (if running secure)

Thanks,
Matt

On Thu, Jan 21, 2016 at 12:06 PM, Paresh Shah 
wrote:

> When the RemoteProcessorGroup has been EnabledForTransmission, the
> exception seen in the log file is
>
>
> 2016-01-21 09:01:05,477 WARN [NiFi Site-to-Site Connection Pool
> Maintenance] o.a.n.r.c.socket.EndpointConnectionPool
> EndpointConnectionPool[Cluster URL=http://localhost:9080/nifi] Unable to
> refresh Remote Group's peers due to {}
>
> java.net.SocketTimeoutException: Timed out reading from socket
>
> at
> org.apache.nifi.remote.io.socket.SocketChannelInputStream.read(SocketChannelInputStream.java:117)
> ~[nifi-utils-0.3.0.jar:0.3.0]
>
> at
> org.apache.nifi.stream.io.ByteCountingInputStream.read(ByteCountingInputStream.java:51)
> ~[nifi-utils-0.3.0.jar:0.3.0]
>
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> ~[na:1.7.0_79]
>
> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> ~[na:1.7.0_79]
>
> at
> org.apache.nifi.remote.io.InterruptableInputStream.read(InterruptableInputStream.java:39)
> ~[nifi-utils-0.3.0.jar:0.3.0]
>
> at java.io.DataInputStream.readInt(DataInputStream.java:387) ~[na:1.7.0_79]
>
> at
> org.apache.nifi.remote.protocol.socket.SocketClientProtocol.getPeerStatuses(SocketClientProtocol.java:237)
> ~[nifi-site-to-site-client-0.3.0.jar:0.3.0]
>
> at
> org.apache.nifi.remote.client.socket.EndpointConnectionPool.fetchRemotePeerStatuses(EndpointConnectionPool.java:607)
> [nifi-site-to-site-client-0.3.0.jar:0.3.0]
>
> at
> org.apache.nifi.remote.client.socket.EndpointConnectionPool.refreshPeers(EndpointConnectionPool.java:839)
> [nifi-site-to-site-client-0.3.0.jar:0.3.0]
>
> at
> org.apache.nifi.remote.client.socket.EndpointConnectionPool.access$000(EndpointConnectionPool.java:93)
> [nifi-site-to-site-client-0.3.0.jar:0.3.0]
>
> at
> org.apache.nifi.remote.client.socket.EndpointConnectionPool$2.run(EndpointConnectionPool.java:192)
> [nifi-site-to-site-client-0.3.0.jar:0.3.0]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_79]
>
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_79]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_79]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_79]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_79]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_79]
>
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
>
>
> ---
>
>
> After about 5 min I see that it was able to send the data over to the
> InputPort. I see the yield duration is only 10s and also the input Port is
> already started.
>
>
> Paresh
>
>
>
> From: Paresh Shah >
> Date: Wednesday, January 20, 2016 at 6:12 PM
> To: "dev@nifi.apache.org"  >
> Subject: Re: How to configure/start multiple Input ports.
>
> Another question.
>
> To use the Remote Processor Group, on the sender side should we do the
> following.
>
>   1.  Create Output Port.
>   2.  Create Remote Processor Group using the OutputPort created.
>
> Or
>
> 1. Create the target pipeline with the input port
> 2. Create Remote Processor Group with the input port -> This gives
> the error that it has timed out reading from the port. But we need to be
> sending the data to this port rather then read from it.
>
> Thanks
> Paresh
>
> From: Paresh Shah >
> Date: Wednesday, January 20, 2016 at 4:01 PM
> To: "dev@nifi.apache.org"  >
> 

[GitHub] nifi pull request: NIFI-1412: Fix error during Avro conversion whe...

2016-01-21 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/182

NIFI-1412: Fix error during Avro conversion where no fields are present

Changes the logic so no exception is thrown when meta.getColumnCount() 
returns 0, instead an empty Avro schema is produced.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-1412

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #182


commit 65aacf7d1b75a3c5483ce0635f778ab1727ed5b3
Author: Matt Burgess 
Date:   2016-01-21T20:20:14Z

NIFI-1412: Fix error during Avro conversion where no fields are present




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1275: Add processor(s) support for Elastic...

2016-01-21 Thread mattyb149
Github user mattyb149 commented on the pull request:

https://github.com/apache/nifi/pull/180#issuecomment-173740429
  
Yes this controller is using the Transport protocol. The explicit docs you 
mention are logstash docs, not elasticsearch docs. The Elasticsearch Java 
Client docs 
(https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html)
 make no mention that the HTTP APIs are more performant and recommended.

Having said that, the updating of versions is indeed a concern, so perhaps 
the NiFi processors should be able to use the HTTP APIs (in addition to or 
instead of). At present the *-Http processors in NiFi should already enable 
this type of interaction. I presumed (apparently incorrectly) that a native 
transport would be faster, or else why have it?

I appreciate your feedback, and will think on these points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread mcgilman
Github user mcgilman commented on the pull request:

https://github.com/apache/nifi/pull/179#issuecomment-173707708
  
Spent some time reviewing the patch. Overall, I think the code looks good. 
Additional nit-picky item outside of my previous comment.

- I could see possibly wanting to extract the RELP utility classes into a 
separate artifact. Maybe we wait to shuffle this around until we have a case 
for it. Maybe introduce a RELP NAR if we ever get to that point.

When running ListenRELP, there were times (especially right after stopping 
and starting the Processor) that new messages were not picked up for some time. 
At some point later, a backlog of messages flooded in. It's not clear what 
triggered it and the backlog appeared to produce some duplicate messages.

@trkurc did you want to check this out before we proceed with merging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1423 Allow to penalize FlowFiles to No Ret...

2016-01-21 Thread ledor473
GitHub user ledor473 opened a pull request:

https://github.com/apache/nifi/pull/183

NIFI-1423 Allow to penalize FlowFiles to No Retry



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ledor473/nifi penalize-http-no-retry

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #183


commit 8f0d5419477706433df6200ea97c7e588fd422a7
Author: Louis-Etienne Dorval 
Date:   2016-01-21T21:08:07Z

NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry 
relationship




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on the pull request:

https://github.com/apache/nifi/pull/179#issuecomment-173712640
  
@mcgilman - I have been reviewing. I should be complete tonight. Haven't 
found any show stoppers yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-210: Add ExecuteScript and InvokeScriptPro...

2016-01-21 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/185

NIFI-210: Add ExecuteScript and InvokeScriptProcessor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi script-processors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #185


commit 1b9bfb6e4a675227ccbe9d6094c9097637662ab5
Author: Matt Burgess 
Date:   2016-01-21T22:52:35Z

NIFI-210: Add ExecuteScript and InvokeScriptProcessor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50489401
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 ---
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An abstract processor to extend from when listening for events over a 
channel. This processor
+ * will start a ChannelDispatcher, and optionally a 
ChannelResponseDispatcher, in a background
+ * thread which will end up placing events on a queue to polled by the 
onTrigger method. Sub-classes
+ * are responsible for providing the dispatcher implementations.
+ *
+ * @param  the type of events being produced
+ */
+public abstract class AbstractListenEventProcessor 
extends AbstractProcessor {
+
+public static final PropertyDescriptor PORT = new PropertyDescriptor
+.Builder().name("Port")
+.description("The port to listen on for communication.")
+.required(true)
+.addValidator(StandardValidators.PORT_VALIDATOR)
+.build();
+public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+.name("Character Set")
+.description("Specifies the character set of the received 
data.")
+.required(true)
+.defaultValue("UTF-8")
+.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+.build();
+public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+.name("Receive Buffer Size")
+.description("The size of each buffer used to receive 
messages. Adjust this value appropriately based on the expected size of the " +
+"incoming messages.")
+.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+.defaultValue("65507 B")
+.required(true)
+.build();
+public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+.name("Max Size of Socket Buffer")
+.description("The maximum size of the socket buffer that 
should be used. This is a suggestion to the Operating System " +
+"to indicate how big the socket buffer should be. If 
this value is set too low, the buffer may fill up before " +
+"the data can be read, and incoming data will be 
dropped.")
+.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+.defaultValue("1 MB")
+.required(true)
+

[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50490153
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the Datagram channel into an available buffer. If data is 
read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+public class DatagramChannelDispatcher> 
implements ChannelDispatcher {
+
+private final EventFactory eventFactory;
+private final BlockingQueue bufferPool;
+private final BlockingQueue events;
+private final ProcessorLog logger;
+
+private Selector selector;
+private DatagramChannel datagramChannel;
+private volatile boolean stopped = false;
+
+public DatagramChannelDispatcher(final EventFactory eventFactory,
+ final BlockingQueue 
bufferPool,
+ final BlockingQueue events,
+ final ProcessorLog logger) {
+this.eventFactory = eventFactory;
+this.bufferPool = bufferPool;
+this.events = events;
+this.logger = logger;
+
+if (bufferPool == null || bufferPool.size() == 0) {
+throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
+}
+}
+
+@Override
+public void open(final int port, int maxBufferSize) throws IOException 
{
+datagramChannel = DatagramChannel.open();
+datagramChannel.configureBlocking(false);
+if (maxBufferSize > 0) {
+datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+if (actualReceiveBufSize < maxBufferSize) {
+logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
++ actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
++ "maximum receive buffer");
+}
+}
+datagramChannel.socket().bind(new InetSocketAddress(port));
+selector = Selector.open();
+datagramChannel.register(selector, SelectionKey.OP_READ);
+}
+
+@Override
+public void run() {
+final ByteBuffer buffer = bufferPool.poll();
+while (!stopped) {
+try {
+int selected = selector.select();
+if (selected > 0){
+Iterator selectorKeys = 
selector.selectedKeys().iterator();
+while (selectorKeys.hasNext()) {
+SelectionKey key = selectorKeys.next();
+selectorKeys.remove();
+if (!key.isValid()) {
+continue;
+}
+  

[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50490135
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the Datagram channel into an available buffer. If data is 
read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+public class DatagramChannelDispatcher> 
implements ChannelDispatcher {
+
+private final EventFactory eventFactory;
+private final BlockingQueue bufferPool;
+private final BlockingQueue events;
+private final ProcessorLog logger;
+
+private Selector selector;
+private DatagramChannel datagramChannel;
+private volatile boolean stopped = false;
+
+public DatagramChannelDispatcher(final EventFactory eventFactory,
+ final BlockingQueue 
bufferPool,
+ final BlockingQueue events,
+ final ProcessorLog logger) {
+this.eventFactory = eventFactory;
+this.bufferPool = bufferPool;
+this.events = events;
+this.logger = logger;
+
+if (bufferPool == null || bufferPool.size() == 0) {
+throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
+}
+}
+
+@Override
+public void open(final int port, int maxBufferSize) throws IOException 
{
+datagramChannel = DatagramChannel.open();
+datagramChannel.configureBlocking(false);
+if (maxBufferSize > 0) {
+datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+if (actualReceiveBufSize < maxBufferSize) {
+logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
++ actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
++ "maximum receive buffer");
+}
+}
+datagramChannel.socket().bind(new InetSocketAddress(port));
+selector = Selector.open();
+datagramChannel.register(selector, SelectionKey.OP_READ);
+}
+
+@Override
+public void run() {
+final ByteBuffer buffer = bufferPool.poll();
+while (!stopped) {
+try {
+int selected = selector.select();
+if (selected > 0){
+Iterator selectorKeys = 
selector.selectedKeys().iterator();
+while (selectorKeys.hasNext()) {
+SelectionKey key = selectorKeys.next();
+selectorKeys.remove();
+if (!key.isValid()) {
+continue;
+}
+  

[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50490685
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Accepts Socket connections on the given port and creates a handler for 
each connection to
+ * be executed by a thread pool.
+ */
+public class SocketChannelDispatcher> 
implements ChannelDispatcher {
+
+private final EventFactory eventFactory;
+private final ChannelHandlerFactory handlerFactory;
+private final BlockingQueue bufferPool;
+private final BlockingQueue events;
+private final ProcessorLog logger;
+private final int maxConnections;
+private final SSLContext sslContext;
+private final Charset charset;
+
+private ExecutorService executor;
+private volatile boolean stopped = false;
+private Selector selector;
+private final BlockingQueue keyQueue;
+private final AtomicInteger currentConnections = new AtomicInteger(0);
+
+
+public SocketChannelDispatcher(final EventFactory eventFactory,
+   final ChannelHandlerFactory 
handlerFactory,
+   final BlockingQueue 
bufferPool,
+   final BlockingQueue events,
+   final ProcessorLog logger,
+   final int maxConnections,
+   final SSLContext sslContext,
+   final Charset charset) {
+this.eventFactory = eventFactory;
+this.handlerFactory = handlerFactory;
+this.bufferPool = bufferPool;
+this.events = events;
+this.logger = logger;
+this.maxConnections = maxConnections;
+this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+this.sslContext = sslContext;
+this.charset = charset;
+
+if (bufferPool == null || bufferPool.size() == 0 || 
bufferPool.size() != maxConnections) {
+throw new IllegalArgumentException(
+"A pool of available ByteBuffers equal to the maximum 
number of connections is required");
+}
+}
+
+@Override
+public void open(final int port, int maxBufferSize) throws IOException 
{
+this.executor = Executors.newFixedThreadPool(maxConnections);
+
+final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
+serverSocketChannel.configureBlocking(false);
+if (maxBufferSize > 0) {
 

Re: Why the Input Ports are not allowed to be inside the ProcessorGroups

2016-01-21 Thread Matt Gilman
Paresh,

Input/Output ports are allowed to be inside of Process Groups. These
facilitate data flow in between the Process Group and it's parent Process
Group. In the case of the root Process Group, it facilitates remote data
flows with other NiFi instances (or anything that can speak the
site-to-site protocol). This restriction is to keep a consistent model that
is most importantly clear to the end user.

There have been other discussions and feature proposals for creating
Function Groups (re-usable Process Group) [1] and Wormhole Connections
(like a symbolic link) [2]. These added features will help alleviate some
of the restrictions enforced the by Process Group model.

Matt

[1]
https://cwiki.apache.org/confluence/display/NIFI/Reference-able+Process+Groups
[2] https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections

On Thu, Jan 21, 2016 at 7:33 PM, Paresh Shah 
wrote:

> In our use case to keep the UI managable we create different
> ProcessorGroups for the different pipelines. But when using the
> RemoteProcessorGroups and InputPorts we are forced to move the pipelines
> out of the processor groups and into the root canvas. This helps us tear
> down and do the deployment cleanly.
>
> If someone can give reasons for the above and also can it be changed so
> that we can work with RPG/InputPorts/ProcessorGroups.
>
> Thanks
> Paresh
> 
> The information contained in this transmission may contain privileged and
> confidential information. It is intended only for the use of the person(s)
> named above. If you are not the intended recipient, you are hereby notified
> that any review, dissemination, distribution or duplication of this
> communication is strictly prohibited. If you are not the intended
> recipient, please contact the sender by reply email and destroy all copies
> of the original message.
> 
>


[GitHub] nifi pull request: NIFI-1273 Adding ListenRELP Processor

2016-01-21 Thread trkurc
Github user trkurc commented on a diff in the pull request:

https://github.com/apache/nifi/pull/179#discussion_r50490353
  
--- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the Datagram channel into an available buffer. If data is 
read then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+public class DatagramChannelDispatcher> 
implements ChannelDispatcher {
+
+private final EventFactory eventFactory;
+private final BlockingQueue bufferPool;
+private final BlockingQueue events;
+private final ProcessorLog logger;
+
+private Selector selector;
+private DatagramChannel datagramChannel;
+private volatile boolean stopped = false;
+
+public DatagramChannelDispatcher(final EventFactory eventFactory,
+ final BlockingQueue 
bufferPool,
+ final BlockingQueue events,
+ final ProcessorLog logger) {
+this.eventFactory = eventFactory;
+this.bufferPool = bufferPool;
+this.events = events;
+this.logger = logger;
+
+if (bufferPool == null || bufferPool.size() == 0) {
+throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
+}
+}
+
+@Override
+public void open(final int port, int maxBufferSize) throws IOException 
{
+datagramChannel = DatagramChannel.open();
+datagramChannel.configureBlocking(false);
+if (maxBufferSize > 0) {
+datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+if (actualReceiveBufSize < maxBufferSize) {
+logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
++ actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
++ "maximum receive buffer");
+}
+}
+datagramChannel.socket().bind(new InetSocketAddress(port));
+selector = Selector.open();
+datagramChannel.register(selector, SelectionKey.OP_READ);
+}
+
+@Override
+public void run() {
+final ByteBuffer buffer = bufferPool.poll();
+while (!stopped) {
+try {
+int selected = selector.select();
+if (selected > 0){
+Iterator selectorKeys = 
selector.selectedKeys().iterator();
+while (selectorKeys.hasNext()) {
+SelectionKey key = selectorKeys.next();
+selectorKeys.remove();
+if (!key.isValid()) {
+continue;
+}
+  

[GitHub] nifi pull request: NIFI-1275: Add processor(s) support for Elastic...

2016-01-21 Thread trixpan
Github user trixpan commented on the pull request:

https://github.com/apache/nifi/pull/180#issuecomment-173770694
  
sounds like a great idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Why the Input Ports are not allowed to be inside the ProcessorGroups

2016-01-21 Thread Paresh Shah
In our use case to keep the UI managable we create different ProcessorGroups 
for the different pipelines. But when using the RemoteProcessorGroups and 
InputPorts we are forced to move the pipelines out of the processor groups and 
into the root canvas. This helps us tear down and do the deployment cleanly.

If someone can give reasons for the above and also can it be changed so that we 
can work with RPG/InputPorts/ProcessorGroups.

Thanks
Paresh

The information contained in this transmission may contain privileged and 
confidential information. It is intended only for the use of the person(s) 
named above. If you are not the intended recipient, you are hereby notified 
that any review, dissemination, distribution or duplication of this 
communication is strictly prohibited. If you are not the intended recipient, 
please contact the sender by reply email and destroy all copies of the original 
message.



Re: Why the Input Ports are not allowed to be inside the ProcessorGroups

2016-01-21 Thread Matt Gilman
Paresh,

Also wanted to let you know another upcoming feature is our multi-tenant
data flow [1]. One of the concepts being considered is creating separate
workspaces. This would essentially offer multiple root level Process Groups
that could provide different levels of authorization. This could
potentially allow you to create each of your pipelines within a separate
workspace.

Matt

[1] https://cwiki.apache.org/confluence/display/NIFI/Multi-Tentant+Dataflow

On Thu, Jan 21, 2016 at 7:55 PM, Matt Gilman 
wrote:

> Paresh,
>
> Input/Output ports are allowed to be inside of Process Groups. These
> facilitate data flow in between the Process Group and it's parent Process
> Group. In the case of the root Process Group, it facilitates remote data
> flows with other NiFi instances (or anything that can speak the
> site-to-site protocol). This restriction is to keep a consistent model that
> is most importantly clear to the end user.
>
> There have been other discussions and feature proposals for creating
> Function Groups (re-usable Process Group) [1] and Wormhole Connections
> (like a symbolic link) [2]. These added features will help alleviate some
> of the restrictions enforced the by Process Group model.
>
> Matt
>
> [1]
> https://cwiki.apache.org/confluence/display/NIFI/Reference-able+Process+Groups
> [2] https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections
>
> On Thu, Jan 21, 2016 at 7:33 PM, Paresh Shah 
> wrote:
>
>> In our use case to keep the UI managable we create different
>> ProcessorGroups for the different pipelines. But when using the
>> RemoteProcessorGroups and InputPorts we are forced to move the pipelines
>> out of the processor groups and into the root canvas. This helps us tear
>> down and do the deployment cleanly.
>>
>> If someone can give reasons for the above and also can it be changed so
>> that we can work with RPG/InputPorts/ProcessorGroups.
>>
>> Thanks
>> Paresh
>> 
>> The information contained in this transmission may contain privileged and
>> confidential information. It is intended only for the use of the person(s)
>> named above. If you are not the intended recipient, you are hereby notified
>> that any review, dissemination, distribution or duplication of this
>> communication is strictly prohibited. If you are not the intended
>> recipient, please contact the sender by reply email and destroy all copies
>> of the original message.
>> 
>>
>
>


[GitHub] nifi pull request: NIFI-1275: Add processor(s) support for Elastic...

2016-01-21 Thread mattyb149
Github user mattyb149 commented on the pull request:

https://github.com/apache/nifi/pull/180#issuecomment-173769666
  
You are right that more processing would be needed to coax the flow file 
into the right format for the *Http processors. If the ES processors were to 
use the HTTP API then it would take care of that for you.

Perhaps the prudent way forward is to get these processors out into the 
world (using the Transport client since it's done) and see what the community 
thinks. We will eventually have a registry where new versions (and perhaps one 
that uses the HTTP API) could be added and the user could select which one(s) 
they want.  What are your thoughts on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1378 fixed JMS URI validation

2016-01-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/167


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1275: Add processor(s) support for Elastic...

2016-01-21 Thread sejasp
Github user sejasp commented on the pull request:

https://github.com/apache/nifi/pull/180#issuecomment-173806065
  
PostHTTP has been working like a charm.  However, GetElasticsearch would be 
a great addition to the "Get" processors.  Preferably, scrolling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1118 Update SplitText Processor - add supp...

2016-01-21 Thread markobean
Github user markobean commented on a diff in the pull request:

https://github.com/apache/nifi/pull/135#discussion_r50449323
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
@@ -198,23 +208,53 @@ private long countBytesToSplitPoint(final InputStream 
in, final OutputStream out
 return includeLineDelimiter ? bytesRead : bytesRead - 1;
 }
 
-// keep track of what the last byte was that we read so that 
we can detect \r followed by some other
+// keep track of what the last byte was that we read so that 
we can
+// detect \r followed by some other
 // character.
 lastByte = nextByte;
 }
 }
 
-private SplitInfo countBytesToSplitPoint(final InputStream in, final 
int numLines, final boolean keepAllNewLines) throws IOException {
+private SplitInfo readHeader(final int numHeaderLines,
+ final String headerMarker, final 
InputStream in,
+ final OutputStream out, final boolean 
keepAllNewLines)
+throws IOException {
 SplitInfo info = new SplitInfo();
-
-while (info.lengthLines < numLines) {
-final long bytesTillNext = countBytesToSplitPoint(in, null, 
keepAllNewLines || (info.lengthLines != numLines - 1));
-if (bytesTillNext <= 0L) {
-break;
+boolean isHeaderLine = true;
+
+// Read numHeaderLines from file, if specificed; a non-zero value 
takes precedence
+// over headerMarker character string
+if (numHeaderLines > 0) {
+for (int i = 0; i < numHeaderLines; i++) {
+int bytesRead = readLine(in, out, keepAllNewLines);
+if (bytesRead == 0) {
+break;
+}
+info.lengthBytes += bytesRead;
+info.lengthLines++;
+}
+// Else, keep reading all lines that begin with headerMarker 
character string
+} else if (headerMarker != null) {
+while (true) {
+in.mark(0);
--- End diff --

This section of code is reading header lines a character at a time to 
determine if the line begins with the headerMarker character(s). After the line 
has been determined to be a header (or non-header), the buffer is reset so that 
a subsequent call to readLine() will capture all characters of the line. The 
maximum number of characters readHeader() will read before calling readLine() 
is the number of characters in the headerMarker String. Therefore, the 
in.mark(0) should be changed to in.mark(headerMarker.length).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---