Re: Unable to access flowfile content

2019-12-21 Thread Purushotham Pushpavanthar
Hi Lei,
Could you please help me understand what caused this problem? and how to
avoid it.

Regards,
Purushotham Pushpavanth



On Thu, 19 Dec 2019 at 16:55, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Purushotham,
>
> Since you are using cluster mode, just delete the flow.xml.gz file and
> restart the node, the flow file will be synced from the other two nodes.
>
> Regards,
> Lei
>
>
>
> wangl...@geekplus.com.cn
>
> From: Purushotham Pushpavanthar
> Date: 2019-12-19 17:05
> To: dev
> Subject: Unable to access flowfile content
> Hi,
>
> We've have 3 node production cluster running seamlessly for almost 8 month
> with manageable ups and downs. However, yesterday we ran into an issue in
> one of the processors due to which CPU shot up and node went down. On
> restart, the contents of few enqueued flowfiles went missing all of sudden
> (I was unable to view content from the content viewer in UI). This also
> resulted in below exception, when was blocking downstream processor from
> processing any flowfile.
> We are using version 1.9.2. It would be very helpful if you can help me
> debug this issue.
> 2019-12-19 07:05:03,653 ERROR [Timer-Driven Process Thread-4]
> o.apache.nifi.processors.hive.PutHiveQL
> PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a]
> PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a] failed to process
> session due to java.lang.RuntimeException: Failed to execute due to
> org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
> from
>
> StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1576648697457-40,
> container=default, section=40], offset=10977,
> length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83];
> Processor Administratively Yielded for 1 sec: java.lang.RuntimeException:
> Failed to execute due to
> org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
> from
>
> StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1576648697457-40,
> container=default, section=40], offset=10977,
> length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
> java.lang.RuntimeException: Failed to execute due to
> org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
> from
>
> StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1576648697457-40,
> container=default, section=40], offset=10977,
> length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
>   at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:145)
>   at
>
> org.apache.nifi.processors.hive.PutHiveQL.lambda$onTrigger$6(PutHiveQL.java:295)
>   at
>
> org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
>   at
>
> org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
>   at
> org.apache.nifi.processors.hive.PutHiveQL.onTrigger(PutHiveQL.java:295)
>   at
>
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
>   at
>
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
>   at
>
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>   at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.nifi.processor.exception.FlowFileAccessException:
> Could not read from
>
> StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1576648697457-40,
> container=default, section=40], offset=10977,
> length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
>   at
> org.apache.nifi.controller.repository.io
> .FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:93)
>   at
> org.apache.ni

Unable to access flowfile content

2019-12-19 Thread Purushotham Pushpavanthar
Hi,

We've have 3 node production cluster running seamlessly for almost 8 month
with manageable ups and downs. However, yesterday we ran into an issue in
one of the processors due to which CPU shot up and node went down. On
restart, the contents of few enqueued flowfiles went missing all of sudden
(I was unable to view content from the content viewer in UI). This also
resulted in below exception, when was blocking downstream processor from
processing any flowfile.
We are using version 1.9.2. It would be very helpful if you can help me
debug this issue.
2019-12-19 07:05:03,653 ERROR [Timer-Driven Process Thread-4]
o.apache.nifi.processors.hive.PutHiveQL
PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a]
PutHiveQL[id=c820350d-d6fd-183d-a3d5-006a2b14d10a] failed to process
session due to java.lang.RuntimeException: Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83];
Processor Administratively Yielded for 1 sec: java.lang.RuntimeException:
Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
java.lang.RuntimeException: Failed to execute due to
org.apache.nifi.processor.exception.FlowFileAccessException: Could not read
from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
  at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:145)
  at
org.apache.nifi.processors.hive.PutHiveQL.lambda$onTrigger$6(PutHiveQL.java:295)
  at
org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
  at
org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
  at org.apache.nifi.processors.hive.PutHiveQL.onTrigger(PutHiveQL.java:295)
  at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
  at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
  at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
  at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.nifi.processor.exception.FlowFileAccessException:
Could not read from
StandardFlowFileRecord[uuid=253e1652-6e3f-49c3-b190-3788fcbc1480,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1576648697457-40,
container=default, section=40], offset=10977,
length=83],offset=0,name=hid_1004.ejuserstruct2.2019121100.sql,size=83]
  at
org.apache.nifi.controller.repository.io.FlowFileAccessInputStream.read(FlowFileAccessInputStream.java:93)
  at
org.apache.nifi.controller.repository.io.TaskTerminationInputStream.read(TaskTerminationInputStream.java:68)
  at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:89)
  at org.apache.nifi.stream.io.StreamUtils.fillBuffer(StreamUtils.java:72)
  at
org.apache.nifi.processors.hive.AbstractHiveQLProcessor$1.process(AbstractHiveQLProcessor.java:92)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2212)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2180)
  at
org.apache.nifi.processors.hive.AbstractHiveQLProcessor.getHiveQL(AbstractHiveQLProcessor.java:89)
  at
org.apache.nifi.processors.hive.PutHiveQL.lambda$new$4(PutHiveQL.java:220)
  at org.apache.nifi.processor.util.pattern.Put.putFlowFiles(Put.java:59)
  at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:102)
  ... 15 common frames omitted
Caused by: java.io.EOFException: null
  at org.apache.nifi.stream.io.StreamUtils.skip(StreamUtils.java:270)
  at
org.apache.nifi.controller.repo

Implementation of FilesystemComponentStatusRepository

2019-08-04 Thread Purushotham Pushpavanthar
Hi,

Right now we only have volatile implementation of ComponentStatusRepository
which serves status report of each component. Though we have reporting
tasks like AmbariReportingTask, PrometheusReportingTasks which push metrics
to external persistent systems, NiFi's Component Status History serves as
primary source of truth. We might have to fall back to this if there is
some issue with external metrics collecting system.
Since the only available implementation is
VolatileComponentStatusRepository, this data is lost if node is restarted.
I feel that filesystem persistent implementation should be available for
users to choose from. Is there anything lined up on this regard?


Regards,
Purushotham Pushpavanth


NiFi - Cluster crashes while running CaptureChangeMySQL for CDC

2019-07-13 Thread Purushotham Pushpavanthar
Hi,

I've been trying to run CDC in 3 node NiFi (ver : 1.9.2) cluster similar to
the what is illustrated here

.
However, I've been facing below issue because of scale. When I start the
processor, the JVM heap(12 GB) utilization reaches 100% on primary node and
then crashes. Same repeats with the other nodes when new Cluster
coordinator/Primary node is elected.

I tried debugging this issue and drafted out below details for discussion.

The processor initializes an unbounded *LinkedBlockingQueue* and registers
it with the listener to binlog client.
The BinaryLogClient reads the binlog files and adds it into the Queue. When
triggered the processor drains the Queue and writes the events to flowfile
and transfer it to SUCCESS relationship in a single thread. However, when
throughput in the database is huge, the queue gets flooded with events and
single threaded processor fails to catch up and results in bloating up the
JVM and the primary node.
Below are the reasons I suspect most (Feel free to correct me if I'm wrong.
Let's debate on this for better understanding).

   1. Due to decoupled nature of this Queue with respect to NiFi
   connections, the back pressure configurations doesn't have any control on
   throughput of BinaryLogClient.
   2. I tried increasing JVM memory settings from Xms3G and Xmx3G to *Xms32G
   and Xmx32G.* If not for CDC, our cluster used run at 20%-70% heap
   utilization with Xmx3G (past 4 months). I'm unable to budget the JVM usage.
   There should be a limit on how much share a processor can take out of the
   cluster.
   3. Having CaptureChangeMySQL as single threaded processor, running on
   primary node adds to the above issue.
   4. The processor doesn't have batching. Ends up creating too many
   flowfiles whose content size is comparable to the its flowfile attributes
   in memory.

I'm posting this thread to initiate discussion on how to solve this issue.
Please share your experiences if you have faced similar issue
in CaptureChangeMySQL processor or any other NiFi processor.
What work around did you follow? How did you fix it?
Is NiFi a right tool for CDC use case?
If so should we have a separate cluster for per CDC pipeline based on the
scale since executing CaptureChangeMySQL in primary node is bottleneck?

*Cluster specs :*
3 Node Cluster

   - Model : c5.2xlarge
   - vCPU : 8
   - Memory (GiB) : 16
   - Instance Storage (GiB) : 200 (EBS)
   - Network Bandwidth (Gbps) : Up to 10
   - EBS Bandwidth (Mbps) : Up to 3,500

*Nifi Configs :*
*Bootstrap.conf*

   - java.arg.2=-Xms12G
   - java.arg.3=-Xmx12G
   - java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
   - java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
   - java.arg.7=-XX:ReservedCodeCacheSize=256m
   - java.arg.8=-XX:CodeCacheMinimumFreeSpace=10m
   - java.arg.9=-XX:+UseCodeCacheFlushing


*nifi.properties *(I can share more configs if needed)

   - nifi.queue.backpressure.count=10
   - nifi.queue.backpressure.size=1 GB


Regards,
Purushotham Pushpavanth


Re: Unable to modify flow when one of the nodes in a cluster is disconnected

2019-07-01 Thread Purushotham Pushpavanthar
Mark, thanks for the clarification.

On Mon, Jul 1, 2019, 9:05 PM Mark Payne  wrote:

> My apologies, I wasn't very clear. If a node is in a disconnected state,
> you cannot make any changes
> to the cluster. You would first have to go to the Cluster menu and choose
> to remove the node from the cluster.
> Then you would be free to make changes to the flow. If the now-removed now
> is then restarted, it will attempt
> to re-join the cluster. At this point, if there are components that have
> been stopped/started/moved around, then
> the node will inherit these changes and join the cluster. But if you have
> changed a processor's properties, for
> instance, this will result in the node failing to join the cluster and
> indicating that the local flow differs from the cluster's flow.
>
>
> On Jun 29, 2019, at 2:53 PM, Purushotham Pushpavanthar <
> pushpavant...@gmail.com<mailto:pushpavant...@gmail.com>> wrote:
>
> Hi Mark,
>
> I thank you for your time and descriptive insights. However, the concern I
> raised was regarding the allowable changes like changing the run status of
> the processors. I couldn't stop or start a processor in the cluster when
> one of the nodes was disconnected. The warning panel displayed is attached
> to the initial mail in this thread.
>
>
> *Now, there are some changes that we do allow, and the node will still
> re-join. For instance, if the positions of elements change, elements are
> startedor stopped, etc. In these cases, the new node will just inherit the
> flow from the cluster and take on those changes.*
>
> Regarding certain kind of changes you mentioned in your previous mail,
> could you please throw some light on which release this it supported from?
>
>
> Regards,
> Purushotham Pushpavanth
>
>
>
> On Thu, 27 Jun 2019 at 19:34, Mark Payne  marka...@hotmail.com>> wrote:
>
> Purushotham,
>
> If the node is disconnected and then attempts to reconnect, flow election
> does not occur. Rather, the node obtains a copy of the flow
> from the cluster, determines whether or not it matches, and if so rejoins.
> If the flow does not match, it disconnects and stops trying to
> reconnect.
>
> There are a few reasons that the node doesn't just inherit the cluster's
> flow blindly. Firstly, if a user were to delete a connection, and the
> re-joining node had data in that connection, it would lose the data. This
> is probably the most important reason - we never want to
> design for data loss.
>
> Secondly, when a node is disconnected from the cluster, the user is able
> to make changes. There are times when users will disconnect a
> particular node from the cluster and make some changes to the dataflow for
> diagnostic purposes. For example, they may want to temporarily
> send data to a new endpoint for sampling. When this happens, we don't want
> to just blindly lose those changes, because the user may not
> have wanted those changes lost. And if an admin is managing several
> systems, it's possible that they could accidentally configure the node
> to point to the wrong cluster, in which case it could potentially lose the
> entire dataflow. Perhaps not a problem if the dataflow exists on other
> nodes, but if this is a standalone node being converted into cluster, it
> could be devastating for the user.
>
> Now, there are some changes that we do allow, and the node will still
> re-join. For instance, if the positions of elements change, elements are
> started
> or stopped, etc. In these cases, the new node will just inherit the flow
> from the cluster and take on those changes.
>
> I think it would probably be advantageous to allow the node to back up its
> own flow before inheriting from the cluster, and then apply any changes
> from
> the cluster that do not result in data loss (i.e., if any connection is
> removed and the node has data in that connection, then fail, else inherit).
> The big down
> side there, honestly, is that it's just a huge amount of effort that would
> be required in order to make that work properly.
>
> So to make a long story short: there are reasons that we don't just
> inherit the flow, but we could work around those problems. There are
> definitely
> areas where we could improve, but it's just not been taken on yet by
> anyone in the community.
>
> Thanks
> -Mark
>
>
> On Jun 27, 2019, at 3:37 AM, Purushotham Pushpavanthar <
> pushpavant...@gmail.com<mailto:pushpavant...@gmail.com> pushpavant...@gmail.com>> wrote:
>
> Hi,
>
> I'm having a 3 nodes( ver 1.9.2) cluster running in production. As infra
> is unreliable due to various factors, our nodes go down often. We d

Re: Unable to modify flow when one of the nodes in a cluster is disconnected

2019-06-29 Thread Purushotham Pushpavanthar
Hi Mark,

I thank you for your time and descriptive insights. However, the concern I
raised was regarding the allowable changes like changing the run status of
the processors. I couldn't stop or start a processor in the cluster when
one of the nodes was disconnected. The warning panel displayed is attached
to the initial mail in this thread.


*Now, there are some changes that we do allow, and the node will still
re-join. For instance, if the positions of elements change, elements are
startedor stopped, etc. In these cases, the new node will just inherit the
flow from the cluster and take on those changes.*

 Regarding certain kind of changes you mentioned in your previous mail,
could you please throw some light on which release this it supported from?


Regards,
Purushotham Pushpavanth



On Thu, 27 Jun 2019 at 19:34, Mark Payne  wrote:

> Purushotham,
>
> If the node is disconnected and then attempts to reconnect, flow election
> does not occur. Rather, the node obtains a copy of the flow
> from the cluster, determines whether or not it matches, and if so rejoins.
> If the flow does not match, it disconnects and stops trying to
> reconnect.
>
> There are a few reasons that the node doesn't just inherit the cluster's
> flow blindly. Firstly, if a user were to delete a connection, and the
> re-joining node had data in that connection, it would lose the data. This
> is probably the most important reason - we never want to
> design for data loss.
>
> Secondly, when a node is disconnected from the cluster, the user is able
> to make changes. There are times when users will disconnect a
> particular node from the cluster and make some changes to the dataflow for
> diagnostic purposes. For example, they may want to temporarily
> send data to a new endpoint for sampling. When this happens, we don't want
> to just blindly lose those changes, because the user may not
> have wanted those changes lost. And if an admin is managing several
> systems, it's possible that they could accidentally configure the node
> to point to the wrong cluster, in which case it could potentially lose the
> entire dataflow. Perhaps not a problem if the dataflow exists on other
> nodes, but if this is a standalone node being converted into cluster, it
> could be devastating for the user.
>
> Now, there are some changes that we do allow, and the node will still
> re-join. For instance, if the positions of elements change, elements are
> started
> or stopped, etc. In these cases, the new node will just inherit the flow
> from the cluster and take on those changes.
>
> I think it would probably be advantageous to allow the node to back up its
> own flow before inheriting from the cluster, and then apply any changes from
> the cluster that do not result in data loss (i.e., if any connection is
> removed and the node has data in that connection, then fail, else inherit).
> The big down
> side there, honestly, is that it's just a huge amount of effort that would
> be required in order to make that work properly.
>
> So to make a long story short: there are reasons that we don't just
> inherit the flow, but we could work around those problems. There are
> definitely
> areas where we could improve, but it's just not been taken on yet by
> anyone in the community.
>
> Thanks
> -Mark
>
>
> On Jun 27, 2019, at 3:37 AM, Purushotham Pushpavanthar <
> pushpavant...@gmail.com<mailto:pushpavant...@gmail.com>> wrote:
>
> Hi,
>
> I'm having a 3 nodes( ver 1.9.2) cluster running in production. As infra
> is unreliable due to various factors, our nodes go down often. We don't
> have distinction between dev and prod cluster. We modify, deploy, test in
> the same cluster. However, when one of the node goes down NiFi restricts us
> to modify the state of the flow by throwing warning window in the
> attachment.
>
> I read<
> https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#flow-election>
> that if a node in the cluster is disconnected and comes back again, flow
> election happens. I would like to understand the motivation for not
> allowing the change of flow in the above scenario.
> I was thinking why can't the latest node joining to the cluster pull a
> most elected flow.xml.gz from the cluster and apply it to itself?
>
> Regards,
> Purushotham Pushpavanth
>
>
>


Unable to modify flow when one of the nodes in a cluster is disconnected

2019-06-27 Thread Purushotham Pushpavanthar
Hi,

I'm having a 3 nodes( ver 1.9.2) cluster running in production. As infra is
unreliable due to various factors, our nodes go down often. We don't have
distinction between dev and prod cluster. We modify, deploy, test in the
same cluster. However, when one of the node goes down NiFi restricts us to
modify the state of the flow by throwing warning window in the attachment.

I read

that if a node in the cluster is disconnected and comes back again, flow
election happens. I would like to understand the motivation for not
allowing the change of flow in the above scenario.
I was thinking why can't the latest node joining to the cluster pull a most
elected flow.xml.gz from the cluster and apply it to itself?

Regards,
Purushotham Pushpavanth