Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

As you found from existing code, DistributedMapCache is used to share
state among different processors, and it can be used by your custom
processors, too.
However, I'd recommend to avoid such tight dependencies between
FlowFiles if possible, or minimize the part in flow that requires that
constraint at least for better performance and simplicity.
For example, since a FlowFile can hold fairly large amount of data,
you could merge all FlowFiles in a single FlowFile, instead of batches
of FlowFiles. If you need logical boundaries, you can use NiFi Record
data model to embed multiple records within a FlowFile, Record should
perform better.

Hope this helps.

Thanks,
Koji


On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
> Hi guys, I'm currently trying to find a proper way in nifi which could sync
> status between my custom processors.
> our requirement is like this, we're doing some ETL work using nifi and I'm
> extracting the data from DB into batches of FlowFiles(each batch of
> FlowFile has a flag FlowFile indicating the end of the batch).
> There're some groups of custom processors downstream that need to process
> these FlowFiles to do some business logic work. And we expect these
> processors to process one batch of FlowFiles at a time.
> Therefore we need to implement a custom Wait processor(let's just call it
> WaitBatch here) to hold all the other batches of FlowFiles while the
> business processors were handling the batch of FlowFiles whose creation
> time is earlier.
>
> In order to implement this, all the WaitBatch processors placed in the flow
> need to read/update records in a shared map so that each set of
> business-logic processors process one batch at a time.
> The entries are keyed using the batch number of the FlowFiles and the value
> of each entry is a batch release counter number which counts the number of
> times the batch of FlowFiles has passed through
> a WaitBatch processor.
> When a batch is released by WaitBatch, it will try to increment the batch
> number entry's value by 1 and then the released batch number and counter
> number will also be saved locally at the WaitBatch with StateManager;
> when the next batch reaches the WaitBatch, it will check if the counter
> value of the previous released batch number in the shared map is greater
> than the one saved locally, if the entry for the batch number does't
> exist(already removed) or the value in the shared map is greater, the next
> batch will be released and the local state and the entry on the shared map
> will be updated similarly.
> In the end of the flow, a custom processor will get the batch number from
> each batch and remove the entry from the shared map .
>
> So this implementation requires a shared map that could read/update
> frequently and atomically. I checked the Wait/Notify processors in NIFI and
> saw it is using the DistributedMapCacheClientService and
> DistributedMapCacheServer to sync status, so I'm wondering if I could use
> the DistributedMapCacheClientService to implement my logic. I also saw
> another implementation called RedisDistributedMapCacheClientService
> which seems to require Redis(I haven't used Redis).  Thanks in advance for
> any suggestions.
>
> Regards,
> Ben


Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Thanks for your quick response, Koji, I haven't heard and seen anything
about the NiFi record data model when I was reading the NiFi
documentations,could you tell me where this model is documented? Thanks.

By the way, to my knowledge, when you need to use the DistributedMapCacheServer
from DistributedMapCacheClientService, you need to specify the host url for
the server, this means inside a NiFi cluster
when I specify the cache server and the node suddenly went down, I couldn't
possibly use it until the node goes up again right? Is there currently such
a cache server in NiFi that could support HA? Thanks.

Regards,
Ben

2017-12-26 18:34 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> As you found from existing code, DistributedMapCache is used to share
> state among different processors, and it can be used by your custom
> processors, too.
> However, I'd recommend to avoid such tight dependencies between
> FlowFiles if possible, or minimize the part in flow that requires that
> constraint at least for better performance and simplicity.
> For example, since a FlowFile can hold fairly large amount of data,
> you could merge all FlowFiles in a single FlowFile, instead of batches
> of FlowFiles. If you need logical boundaries, you can use NiFi Record
> data model to embed multiple records within a FlowFile, Record should
> perform better.
>
> Hope this helps.
>
> Thanks,
> Koji
>
>
> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
> > Hi guys, I'm currently trying to find a proper way in nifi which could
> sync
> > status between my custom processors.
> > our requirement is like this, we're doing some ETL work using nifi and
> I'm
> > extracting the data from DB into batches of FlowFiles(each batch of
> > FlowFile has a flag FlowFile indicating the end of the batch).
> > There're some groups of custom processors downstream that need to process
> > these FlowFiles to do some business logic work. And we expect these
> > processors to process one batch of FlowFiles at a time.
> > Therefore we need to implement a custom Wait processor(let's just call it
> > WaitBatch here) to hold all the other batches of FlowFiles while the
> > business processors were handling the batch of FlowFiles whose creation
> > time is earlier.
> >
> > In order to implement this, all the WaitBatch processors placed in the
> flow
> > need to read/update records in a shared map so that each set of
> > business-logic processors process one batch at a time.
> > The entries are keyed using the batch number of the FlowFiles and the
> value
> > of each entry is a batch release counter number which counts the number
> of
> > times the batch of FlowFiles has passed through
> > a WaitBatch processor.
> > When a batch is released by WaitBatch, it will try to increment the batch
> > number entry's value by 1 and then the released batch number and counter
> > number will also be saved locally at the WaitBatch with StateManager;
> > when the next batch reaches the WaitBatch, it will check if the counter
> > value of the previous released batch number in the shared map is greater
> > than the one saved locally, if the entry for the batch number does't
> > exist(already removed) or the value in the shared map is greater, the
> next
> > batch will be released and the local state and the entry on the shared
> map
> > will be updated similarly.
> > In the end of the flow, a custom processor will get the batch number from
> > each batch and remove the entry from the shared map .
> >
> > So this implementation requires a shared map that could read/update
> > frequently and atomically. I checked the Wait/Notify processors in NIFI
> and
> > saw it is using the DistributedMapCacheClientService and
> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
> > the DistributedMapCacheClientService to implement my logic. I also saw
> > another implementation called RedisDistributedMapCacheClientService
> > which seems to require Redis(I haven't used Redis).  Thanks in advance
> for
> > any suggestions.
> >
> > Regards,
> > Ben
>


Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

This blog post written by Mark, would be a good starting point to get
familiar with NiFi Record model.
https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

HA for DistributedMapCacheClientService and DistributedMapCacheServer
pair is not supported at the moment. If you need HighAvailability,
RedisDistributedMapCacheClientService with Redis replication will
provide that, I haven't tried that myself though.
https://redis.io/topics/replication

Thanks,
Koji

On Tue, Dec 26, 2017 at 7:58 PM, 尹文才  wrote:
> Thanks for your quick response, Koji, I haven't heard and seen anything
> about the NiFi record data model when I was reading the NiFi
> documentations,could you tell me where this model is documented? Thanks.
>
> By the way, to my knowledge, when you need to use the 
> DistributedMapCacheServer
> from DistributedMapCacheClientService, you need to specify the host url for
> the server, this means inside a NiFi cluster
> when I specify the cache server and the node suddenly went down, I couldn't
> possibly use it until the node goes up again right? Is there currently such
> a cache server in NiFi that could support HA? Thanks.
>
> Regards,
> Ben
>
> 2017-12-26 18:34 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> As you found from existing code, DistributedMapCache is used to share
>> state among different processors, and it can be used by your custom
>> processors, too.
>> However, I'd recommend to avoid such tight dependencies between
>> FlowFiles if possible, or minimize the part in flow that requires that
>> constraint at least for better performance and simplicity.
>> For example, since a FlowFile can hold fairly large amount of data,
>> you could merge all FlowFiles in a single FlowFile, instead of batches
>> of FlowFiles. If you need logical boundaries, you can use NiFi Record
>> data model to embed multiple records within a FlowFile, Record should
>> perform better.
>>
>> Hope this helps.
>>
>> Thanks,
>> Koji
>>
>>
>> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
>> > Hi guys, I'm currently trying to find a proper way in nifi which could
>> sync
>> > status between my custom processors.
>> > our requirement is like this, we're doing some ETL work using nifi and
>> I'm
>> > extracting the data from DB into batches of FlowFiles(each batch of
>> > FlowFile has a flag FlowFile indicating the end of the batch).
>> > There're some groups of custom processors downstream that need to process
>> > these FlowFiles to do some business logic work. And we expect these
>> > processors to process one batch of FlowFiles at a time.
>> > Therefore we need to implement a custom Wait processor(let's just call it
>> > WaitBatch here) to hold all the other batches of FlowFiles while the
>> > business processors were handling the batch of FlowFiles whose creation
>> > time is earlier.
>> >
>> > In order to implement this, all the WaitBatch processors placed in the
>> flow
>> > need to read/update records in a shared map so that each set of
>> > business-logic processors process one batch at a time.
>> > The entries are keyed using the batch number of the FlowFiles and the
>> value
>> > of each entry is a batch release counter number which counts the number
>> of
>> > times the batch of FlowFiles has passed through
>> > a WaitBatch processor.
>> > When a batch is released by WaitBatch, it will try to increment the batch
>> > number entry's value by 1 and then the released batch number and counter
>> > number will also be saved locally at the WaitBatch with StateManager;
>> > when the next batch reaches the WaitBatch, it will check if the counter
>> > value of the previous released batch number in the shared map is greater
>> > than the one saved locally, if the entry for the batch number does't
>> > exist(already removed) or the value in the shared map is greater, the
>> next
>> > batch will be released and the local state and the entry on the shared
>> map
>> > will be updated similarly.
>> > In the end of the flow, a custom processor will get the batch number from
>> > each batch and remove the entry from the shared map .
>> >
>> > So this implementation requires a shared map that could read/update
>> > frequently and atomically. I checked the Wait/Notify processors in NIFI
>> and
>> > saw it is using the DistributedMapCacheClientService and
>> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
>> > the DistributedMapCacheClientService to implement my logic. I also saw
>> > another implementation called RedisDistributedMapCacheClientService
>> > which seems to require Redis(I haven't used Redis).  Thanks in advance
>> for
>> > any suggestions.
>> >
>> > Regards,
>> > Ben
>>


Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Thanks Koji, I will look into this article about the record model.

By the way, that error I previously mentioned to you occurred again, I
could see the sql query was executed twice in the log, this time I had
turned on the verbose NiFi logging, the sql query is as below:

2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
_id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I also saw a lot of NiFi's exception like "ProcessException: FlowFile
Repository failed to update", not sure if this is the reason the FlowFile
got processed twice.  Could you help to take a look at my log file? Thanks.
You could get the log file via the link:
https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view

Best Regards,
Ben

2017-12-27 10:00 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> This blog post written by Mark, would be a good starting point to get
> familiar with NiFi Record model.
> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>
> HA for DistributedMapCacheClientService and DistributedMapCacheServer
> pair is not supported at the moment. If you need HighAvailability,
> RedisDistributedMapCacheClientService with Redis replication will
> provide that, I haven't tried that myself though.
> https://redis.io/topics/replication
>
> Thanks,
> Koji
>
> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才  wrote:
> > Thanks for your quick response, Koji, I haven't heard and seen anything
> > about the NiFi record data model when I was reading the NiFi
> > documentations,could you tell me where this model is documented? Thanks.
> >
> > By the way, to my knowledge, when you need to use the
> DistributedMapCacheServer
> > from DistributedMapCacheClientService, you need to specify the host url
> for
> > the server, this means inside a NiFi cluster
> > when I specify the cache server and the node suddenly went down, I
> couldn't
> > possibly use it 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

The following two log messages are very close in terms of written
timestamp, but have different log level.
2017-12-26 07:00:01,312 INFO
2017-12-26 07:00:01,315 ERROR

I guess those are logged within a single onTrigger of your
ExecuteSqlCommand custom processor, one is before executing, the other
is when it caught an exception. Just guessing as I don't have access
to the code.

Does the same issue happen with other processors bundled with Apache
NiFi without your custom processor running?

If NiFi fails to update/checkpoint FlowFile repository, then the same
FlowFile can be processed again after restarting NiFi.

Thanks,
Koji



On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> Thanks Koji, I will look into this article about the record model.
>
> By the way, that error I previously mentioned to you occurred again, I
> could see the sql query was executed twice in the log, this time I had
> turned on the verbose NiFi logging, the sql query is as below:
>
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
> at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> at
> com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I also saw a lot of NiFi's exception like "ProcessException: FlowFile
> Repository failed to update", not sure if this is the reason the FlowFile
> got processed twice.  Could you help to take a look at my log file? Thanks.
> You could get the log file via the link:
> https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view
>
> Best Regards,
> Ben
>
> 2017-12-27 10:00 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> This blog post written by Mark, would be a good starting point to get
>> familiar with NiFi Record model.
>> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>>
>> HA for DistributedMapCacheClientService and Distri

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Hi Koji, I will print the sql before actually executing it, but I checked
the error log line you mentioned in your reply, this error was thrown by
NiFi from within another processor called WaitBatch.
I didn't find similar errors as the one from the ExecuteSqlCommand
processor, I think it's because only the ExecuteSqlCommand is used to
create temp database tables.
You could check my ExecuteSqlCommand code via the link:
https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P

If the error is really caused by FlowFile repository checkpoint failure and
the flowfile was executed twice, I may have to create the temp table only
if doesn't exist, I didn't fix this bug in this way
right away is because I was afraid this fix could cover some other problems.

Thanks.

Regards,
Ben

2017-12-27 11:38 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> The following two log messages are very close in terms of written
> timestamp, but have different log level.
> 2017-12-26 07:00:01,312 INFO
> 2017-12-26 07:00:01,315 ERROR
>
> I guess those are logged within a single onTrigger of your
> ExecuteSqlCommand custom processor, one is before executing, the other
> is when it caught an exception. Just guessing as I don't have access
> to the code.
>
> Does the same issue happen with other processors bundled with Apache
> NiFi without your custom processor running?
>
> If NiFi fails to update/checkpoint FlowFile repository, then the same
> FlowFile can be processed again after restarting NiFi.
>
> Thanks,
> Koji
>
>
>
> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> > Thanks Koji, I will look into this article about the record model.
> >
> > By the way, that error I previously mentioned to you occurred again, I
> > could see the sql query was executed twice in the log, this time I had
> > turned on the verbose NiFi logging, the sql query is as below:
> >
> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
> >
> > and it was executed again later:
> >
> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
> SQLServerException.java:217)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> SQLServerStatement.java:1655)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
> SQLServerStatement.java:885)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
> SQLServerStatement.java:778)
> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> SQLServerConnection.java:2445)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> SQLServerStatement.java:191)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> SQLServerStatement.java:166)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> SQLServerStatement.java:751)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> executeSql(ExecuteSqlCommand.java:194)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> onTrigger(ExecuteSqlCommand.java:164)
> > at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> > at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1119)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> > at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:128)
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> Sc

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

I was referring these two log messages in your previous email.
These two messages are both written by ExecuteSqlCommand, it does not
mean 'it was executed again'.

```
2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
```

As you written, the case where FlowFile repository fails checkpointing
will affect other processors to process same FlowFiles again. However
there won't be a simple solution to every processor to rollback its
job as different processors do different things. Creating a temp table
if not exist seems right approach to me.

At the same time, the route cause of getting FlowFile repository
failed should be investigated. Is it possible to share WaitBatch code?
The reason why ask this is all 'FlowFile Repository failed to update'
is related to WaitBatch processor in the log that you shared earlier.

Thanks,
Koji

On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
> Hi Koji, I will print the sql before actually executing it, but I checked
> the error log line you mentioned in your reply, this error was thrown by
> NiFi from within another processor called WaitBatch.
> I didn't find similar errors as the one from the ExecuteSqlCommand
> processor, I think it's because only the ExecuteSqlCommand is used to
> create temp database tables.
> You could check my ExecuteSqlCommand code via the link:
> https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>
> If the error is really caused by FlowFile repository checkpoint failure and
> the flowfile was executed twice, I may have to create the temp table only
> if doesn't exist, I didn't fix this bug in this way
> right away is because I was afraid this fix could cover some other problems.
>
> Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> The following two log messages are very close in terms of written
>> timestamp, but have different log level.
>> 2017-12-26 07:00:01,312 INFO
>> 2017-12-26 07:00:01,315 ERROR
>>
>> I guess those are logged within a single onTrigger of your
>> ExecuteSqlCommand custom processor, one is before executing, the other
>> is when it caught an exception. Just guessing as I don't have access
>> to the code.
>>
>> Does the same issue happen with other processors bundled with Apache
>> NiFi without your custom processor running?
>>
>> If NiFi fails to update/checkpoint FlowFile repository, then the same
>> FlowFile can be processed again after restarting NiFi.
>>
>> Thanks,
>> Koji
>>
>>
>>
>> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
>> > Thanks Koji, I will look into this article about the record model.
>> >
>> > By the way, that error I previously mentioned to you occurred again, I
>> > could see the sql query was executed twice in the log, this time I had
>> > turned on the verbose NiFi logging, the sql query is as below:
>> >
>> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
>> SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> _id;
>> >
>> > and it was executed again later:
>> >
>> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句失败:SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
>> SQLServerException.java:217)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
>> SQLServerStatement.java:1655)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
>> SQLServerStatement.java:885)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
>> SQLServerStatement.java:778)
>> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
>> SQLServerConnecti

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Hi Koji, no problem. You could check the code of processor WaitBatch at the
link:
https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ

I also uploaded a snapshot of part of NiFi flow which includes the
ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view

You mentioned above that FlowFile repository fails checkpointing will
affect other processors to process same FlowFile again, but as you could
see from my snapshot image, the ExecuteSqlCommand is the second processor
and before the WaitBatch processor, even if the FlowFile repository
checkpointing failure is caused by WaitBatch, could it lead to the
processors before it to process a FlowFile multiple times? Thanks.

Regards,
Ben

2017-12-27 12:36 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> I was referring these two log messages in your previous email.
> These two messages are both written by ExecuteSqlCommand, it does not
> mean 'it was executed again'.
>
> ```
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> ```
>
> As you written, the case where FlowFile repository fails checkpointing
> will affect other processors to process same FlowFiles again. However
> there won't be a simple solution to every processor to rollback its
> job as different processors do different things. Creating a temp table
> if not exist seems right approach to me.
>
> At the same time, the route cause of getting FlowFile repository
> failed should be investigated. Is it possible to share WaitBatch code?
> The reason why ask this is all 'FlowFile Repository failed to update'
> is related to WaitBatch processor in the log that you shared earlier.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
> > Hi Koji, I will print the sql before actually executing it, but I checked
> > the error log line you mentioned in your reply, this error was thrown by
> > NiFi from within another processor called WaitBatch.
> > I didn't find similar errors as the one from the ExecuteSqlCommand
> > processor, I think it's because only the ExecuteSqlCommand is used to
> > create temp database tables.
> > You could check my ExecuteSqlCommand code via the link:
> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
> >
> > If the error is really caused by FlowFile repository checkpoint failure
> and
> > the flowfile was executed twice, I may have to create the temp table only
> > if doesn't exist, I didn't fix this bug in this way
> > right away is because I was afraid this fix could cover some other
> problems.
> >
> > Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
> >
> >> Hi Ben,
> >>
> >> The following two log messages are very close in terms of written
> >> timestamp, but have different log level.
> >> 2017-12-26 07:00:01,312 INFO
> >> 2017-12-26 07:00:01,315 ERROR
> >>
> >> I guess those are logged within a single onTrigger of your
> >> ExecuteSqlCommand custom processor, one is before executing, the other
> >> is when it caught an exception. Just guessing as I don't have access
> >> to the code.
> >>
> >> Does the same issue happen with other processors bundled with Apache
> >> NiFi without your custom processor running?
> >>
> >> If NiFi fails to update/checkpoint FlowFile repository, then the same
> >> FlowFile can be processed again after restarting NiFi.
> >>
> >> Thanks,
> >> Koji
> >>
> >>
> >>
> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> >> > Thanks Koji, I will look into this article about the record model.
> >> >
> >> > By the way, that error I previously mentioned to you occurred again, I
> >> > could see the sql query was executed twice in the log, this time I had
> >> > turned on the verbose NiFi logging, the sql query is as below:
> >> >
> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> >> SELECT
> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >> > dbo.ods_extractDataDebug;
> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >> _id;
> >> >
> >> > and it was executed again later:
> >> >
> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >> 执行sql语句失败:SELECT
> >> >

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

The one thing that looks strange in the screenshot is the
ExecuteSqlCommand having FlowFiles queued in its incoming connection.
Those should be transferred to 'failure' relationship.

Following executeSql() method, shouldn't it re-throw the caught exception?


try (Connection con = dbcpService.getConnection()) {
logger.debug("设置autoCommit为false");
con.setAutoCommit(false);

try (Statement stmt = con.createStatement()) {
logger.info("执行sql语句: {}", new Object[]{sql});
stmt.execute(sql);

// 所有sql语句执行在一个transaction内
logger.debug("提交transaction");
con.commit();
} catch (Exception ex) {
logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
con.rollback();
//将exception抛到外层处理
throw ex;
} finally {
logger.debug("重新设置autoCommit为true");
con.setAutoCommit(true);
}
} catch (Exception ex) {
// HERE, the exception is swallowed, that's why the FlowFiles stay in
the incoming connection.
logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
retryOnFail = true;
}

Thanks,
Koji

On Wed, Dec 27, 2017 at 2:38 PM, 尹文才  wrote:
> Hi Koji, no problem. You could check the code of processor WaitBatch at the
> link:
> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>
> I also uploaded a snapshot of part of NiFi flow which includes the
> ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
>
> You mentioned above that FlowFile repository fails checkpointing will
> affect other processors to process same FlowFile again, but as you could
> see from my snapshot image, the ExecuteSqlCommand is the second processor
> and before the WaitBatch processor, even if the FlowFile repository
> checkpointing failure is caused by WaitBatch, could it lead to the
> processors before it to process a FlowFile multiple times? Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 12:36 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> I was referring these two log messages in your previous email.
>> These two messages are both written by ExecuteSqlCommand, it does not
>> mean 'it was executed again'.
>>
>> ```
>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> c.z.nifi.processors.ExecuteSqlCommand
>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> dbo.ods_extractDataDebug;
>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> _id;
>>
>> and it was executed again later:
>>
>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> c.z.nifi.processors.ExecuteSqlCommand
>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句失败:SELECT
>> ```
>>
>> As you written, the case where FlowFile repository fails checkpointing
>> will affect other processors to process same FlowFiles again. However
>> there won't be a simple solution to every processor to rollback its
>> job as different processors do different things. Creating a temp table
>> if not exist seems right approach to me.
>>
>> At the same time, the route cause of getting FlowFile repository
>> failed should be investigated. Is it possible to share WaitBatch code?
>> The reason why ask this is all 'FlowFile Repository failed to update'
>> is related to WaitBatch processor in the log that you shared earlier.
>>
>> Thanks,
>> Koji
>>
>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
>> > Hi Koji, I will print the sql before actually executing it, but I checked
>> > the error log line you mentioned in your reply, this error was thrown by
>> > NiFi from within another processor called WaitBatch.
>> > I didn't find similar errors as the one from the ExecuteSqlCommand
>> > processor, I think it's because only the ExecuteSqlCommand is used to
>> > create temp database tables.
>> > You could check my ExecuteSqlCommand code via the link:
>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>> >
>> > If the error is really caused by FlowFile repository checkpoint failure
>> and
>> > the flowfile was executed twice, I may have to create the temp table only
>> > if doesn't exist, I didn't fix this bug in this way
>> > right away is because I was afraid this fix could cover some other
>> problems.
>> >
>> > Thanks.
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
>> >
>> >> Hi Ben,
>> >>
>> >> The following two log messages are very close in terms of written
>> >> timestamp, but have different log level.
>> >> 2017-12-26 07:00:01,312 INFO
>> >> 2017-12-26 07:00:01,315 ERROR
>> >>
>> >> I guess those are logged within a single onTrig

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread Koji Kawamura
Hi Ben,

Excuse me, I'm trying, but probably I don't fully understand what you
want to achieve with the flow.

It looks weird that WaitBatch is failing with such FlowFile repository
error, while other processor such as ReplaceText succeeds.
I recommend to test WaitBatch alone first without combining the
database related processors, by feeding a test FlowFile having
expected FlowFile attributes.
Such input FlowFiles can be created by GenerateFlowFile processor.
If the same error happens with only WaitBatch processor, then it
should be easier to debug.

Thanks,
Koji

On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura  wrote:
> Hi Ben,
>
> The one thing that looks strange in the screenshot is the
> ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> Those should be transferred to 'failure' relationship.
>
> Following executeSql() method, shouldn't it re-throw the caught exception?
>
>
> try (Connection con = dbcpService.getConnection()) {
> logger.debug("设置autoCommit为false");
> con.setAutoCommit(false);
>
> try (Statement stmt = con.createStatement()) {
> logger.info("执行sql语句: {}", new Object[]{sql});
> stmt.execute(sql);
>
> // 所有sql语句执行在一个transaction内
> logger.debug("提交transaction");
> con.commit();
> } catch (Exception ex) {
> logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
> con.rollback();
> //将exception抛到外层处理
> throw ex;
> } finally {
> logger.debug("重新设置autoCommit为true");
> con.setAutoCommit(true);
> }
> } catch (Exception ex) {
> // HERE, the exception is swallowed, that's why the FlowFiles stay in
> the incoming connection.
> logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> retryOnFail = true;
> }
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 2:38 PM, 尹文才  wrote:
>> Hi Koji, no problem. You could check the code of processor WaitBatch at the
>> link:
>> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>>
>> I also uploaded a snapshot of part of NiFi flow which includes the
>> ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
>> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
>>
>> You mentioned above that FlowFile repository fails checkpointing will
>> affect other processors to process same FlowFile again, but as you could
>> see from my snapshot image, the ExecuteSqlCommand is the second processor
>> and before the WaitBatch processor, even if the FlowFile repository
>> checkpointing failure is caused by WaitBatch, could it lead to the
>> processors before it to process a FlowFile multiple times? Thanks.
>>
>> Regards,
>> Ben
>>
>> 2017-12-27 12:36 GMT+08:00 Koji Kawamura :
>>
>>> Hi Ben,
>>>
>>> I was referring these two log messages in your previous email.
>>> These two messages are both written by ExecuteSqlCommand, it does not
>>> mean 'it was executed again'.
>>>
>>> ```
>>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>>> c.z.nifi.processors.ExecuteSqlCommand
>>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
>>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>>> dbo.ods_extractDataDebug;
>>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>>> _id;
>>>
>>> and it was executed again later:
>>>
>>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>>> c.z.nifi.processors.ExecuteSqlCommand
>>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> 执行sql语句失败:SELECT
>>> ```
>>>
>>> As you written, the case where FlowFile repository fails checkpointing
>>> will affect other processors to process same FlowFiles again. However
>>> there won't be a simple solution to every processor to rollback its
>>> job as different processors do different things. Creating a temp table
>>> if not exist seems right approach to me.
>>>
>>> At the same time, the route cause of getting FlowFile repository
>>> failed should be investigated. Is it possible to share WaitBatch code?
>>> The reason why ask this is all 'FlowFile Repository failed to update'
>>> is related to WaitBatch processor in the log that you shared earlier.
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
>>> > Hi Koji, I will print the sql before actually executing it, but I checked
>>> > the error log line you mentioned in your reply, this error was thrown by
>>> > NiFi from within another processor called WaitBatch.
>>> > I didn't find similar errors as the one from the ExecuteSqlCommand
>>> > processor, I think it's because only the ExecuteSqlCommand is used to
>>> > create temp database tables.
>>> > You could check my ExecuteSqlCommand code via t

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread 尹文才
Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
the sql query if the connection is lost(connection could be unstable), my
idea is to only transfer the FlowFile to the success relationship
after successfully executing the sql query. You could see the do while loop
in the code, the transaction will be rollbacked if the execution failed; if
the connection is lost, it will retry to execute the sql.
Will this logic cause my sql to be executed twice?

For the WaitBatch processor, I will take your approach to test individually
to see if the WaitBatch processor could cause the FlowFile repository
checkpointing failure.

Regards,
Ben

2017-12-27 16:10 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> Excuse me, I'm trying, but probably I don't fully understand what you
> want to achieve with the flow.
>
> It looks weird that WaitBatch is failing with such FlowFile repository
> error, while other processor such as ReplaceText succeeds.
> I recommend to test WaitBatch alone first without combining the
> database related processors, by feeding a test FlowFile having
> expected FlowFile attributes.
> Such input FlowFiles can be created by GenerateFlowFile processor.
> If the same error happens with only WaitBatch processor, then it
> should be easier to debug.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura 
> wrote:
> > Hi Ben,
> >
> > The one thing that looks strange in the screenshot is the
> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> > Those should be transferred to 'failure' relationship.
> >
> > Following executeSql() method, shouldn't it re-throw the caught
> exception?
> >
> >
> > try (Connection con = dbcpService.getConnection()) {
> > logger.debug("设置autoCommit为false");
> > con.setAutoCommit(false);
> >
> > try (Statement stmt = con.createStatement()) {
> > logger.info("执行sql语句: {}", new Object[]{sql});
> > stmt.execute(sql);
> >
> > // 所有sql语句执行在一个transaction内
> > logger.debug("提交transaction");
> > con.commit();
> > } catch (Exception ex) {
> > logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
> > con.rollback();
> > //将exception抛到外层处理
> > throw ex;
> > } finally {
> > logger.debug("重新设置autoCommit为true");
> > con.setAutoCommit(true);
> > }
> > } catch (Exception ex) {
> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
> > the incoming connection.
> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> > retryOnFail = true;
> > }
> >
> > Thanks,
> > Koji
> >
> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才  wrote:
> >> Hi Koji, no problem. You could check the code of processor WaitBatch at
> the
> >> link:
> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
> >>
> >> I also uploaded a snapshot of part of NiFi flow which includes the
> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
> link:
> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
> >>
> >> You mentioned above that FlowFile repository fails checkpointing will
> >> affect other processors to process same FlowFile again, but as you could
> >> see from my snapshot image, the ExecuteSqlCommand is the second
> processor
> >> and before the WaitBatch processor, even if the FlowFile repository
> >> checkpointing failure is caused by WaitBatch, could it lead to the
> >> processors before it to process a FlowFile multiple times? Thanks.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura :
> >>
> >>> Hi Ben,
> >>>
> >>> I was referring these two log messages in your previous email.
> >>> These two messages are both written by ExecuteSqlCommand, it does not
> >>> mean 'it was executed again'.
> >>>
> >>> ```
> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >>> c.z.nifi.processors.ExecuteSqlCommand
> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >>> dbo.ods_extractDataDebug;
> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >>> _id;
> >>>
> >>> and it was executed again later:
> >>>
> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >>> c.z.nifi.processors.ExecuteSqlCommand
> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >>> 执行sql语句失败:SELECT
> >>> ```
> >>>
> >>> As you written, the case where FlowFile repository fails checkpointing
> >>> will affect other processors to process same FlowFiles again. However
> >>> there won't be a simple solution to every processor to rollback its
> >>> job as different proce

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread Koji Kawamura
Hi Ben,

The ExecuteSqlCommand retry logic does not execute the same query
multiple times if it succeeds.
So, there must be input FlowFiles containing the same query had been
passed more than once.
It could be the same FlowFile, or different FlowFiles generated by the
first processor for some reason.
To investigate those kind of FlowFile level information, NiFi
provenance data and FlowFile lineage will be very useful.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#viewing-flowfile-lineage

I didn't mention about it earlier because you were having Provenance
repository performance issue, but I hope you can use it now with the
WriteAheadProvenanceRepository.

Thanks,
Koji

On Wed, Dec 27, 2017 at 5:44 PM, 尹文才  wrote:
> Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
> the sql query if the connection is lost(connection could be unstable), my
> idea is to only transfer the FlowFile to the success relationship
> after successfully executing the sql query. You could see the do while loop
> in the code, the transaction will be rollbacked if the execution failed; if
> the connection is lost, it will retry to execute the sql.
> Will this logic cause my sql to be executed twice?
>
> For the WaitBatch processor, I will take your approach to test individually
> to see if the WaitBatch processor could cause the FlowFile repository
> checkpointing failure.
>
> Regards,
> Ben
>
> 2017-12-27 16:10 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> Excuse me, I'm trying, but probably I don't fully understand what you
>> want to achieve with the flow.
>>
>> It looks weird that WaitBatch is failing with such FlowFile repository
>> error, while other processor such as ReplaceText succeeds.
>> I recommend to test WaitBatch alone first without combining the
>> database related processors, by feeding a test FlowFile having
>> expected FlowFile attributes.
>> Such input FlowFiles can be created by GenerateFlowFile processor.
>> If the same error happens with only WaitBatch processor, then it
>> should be easier to debug.
>>
>> Thanks,
>> Koji
>>
>> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura 
>> wrote:
>> > Hi Ben,
>> >
>> > The one thing that looks strange in the screenshot is the
>> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
>> > Those should be transferred to 'failure' relationship.
>> >
>> > Following executeSql() method, shouldn't it re-throw the caught
>> exception?
>> >
>> >
>> > try (Connection con = dbcpService.getConnection()) {
>> > logger.debug("设置autoCommit为false");
>> > con.setAutoCommit(false);
>> >
>> > try (Statement stmt = con.createStatement()) {
>> > logger.info("执行sql语句: {}", new Object[]{sql});
>> > stmt.execute(sql);
>> >
>> > // 所有sql语句执行在一个transaction内
>> > logger.debug("提交transaction");
>> > con.commit();
>> > } catch (Exception ex) {
>> > logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
>> > con.rollback();
>> > //将exception抛到外层处理
>> > throw ex;
>> > } finally {
>> > logger.debug("重新设置autoCommit为true");
>> > con.setAutoCommit(true);
>> > }
>> > } catch (Exception ex) {
>> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
>> > the incoming connection.
>> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
>> > retryOnFail = true;
>> > }
>> >
>> > Thanks,
>> > Koji
>> >
>> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才  wrote:
>> >> Hi Koji, no problem. You could check the code of processor WaitBatch at
>> the
>> >> link:
>> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>> >>
>> >> I also uploaded a snapshot of part of NiFi flow which includes the
>> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
>> link:
>> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
>> >>
>> >> You mentioned above that FlowFile repository fails checkpointing will
>> >> affect other processors to process same FlowFile again, but as you could
>> >> see from my snapshot image, the ExecuteSqlCommand is the second
>> processor
>> >> and before the WaitBatch processor, even if the FlowFile repository
>> >> checkpointing failure is caused by WaitBatch, could it lead to the
>> >> processors before it to process a FlowFile multiple times? Thanks.
>> >>
>> >> Regards,
>> >> Ben
>> >>
>> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura :
>> >>
>> >>> Hi Ben,
>> >>>
>> >>> I was referring these two log messages in your previous email.
>> >>> These two messages are both written by ExecuteSqlCommand, it does not
>> >>> mean 'it was executed again'.
>> >>>
>> >>> ```
>> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Pro

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread 尹文才
Hi Koji, thanks, the names of the temp tables are created with format
"MMddHHmmssSSS-", the first time indicates the time and the second
part is a random number with length of 4.
So I think it's not possible to have 2 duplicate table names, the only
possibly I could think is the flowfile is passed into the processor twice.

About the provenance, I had updated to use the
WriteAheadProvenanceRepository implementation, but when I tried to check
the data provenance, it showed me the following exception message:
HTTP ERROR 500

Problem accessing /nifi/provenance. Reason:

Server Error

Caused by:

javax.servlet.ServletException:
org.eclipse.jetty.servlet.ServletHolder$1:
java.lang.NullPointerException
at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:564)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
java.lang.NullPointerException
at 
org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
at 
org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
at 
org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
at 
org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at 
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:564)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
at 
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
at 
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecu

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread 尹文才
Hi Koji, sorry about the provenance exception, it was because there's no
space left on the machine(filled up with logs)

Regards,
Ben

2017-12-27 17:11 GMT+08:00 尹文才 :

> Hi Koji, thanks, the names of the temp tables are created with format
> "MMddHHmmssSSS-", the first time indicates the time and the second
> part is a random number with length of 4.
> So I think it's not possible to have 2 duplicate table names, the only
> possibly I could think is the flowfile is passed into the processor twice.
>
> About the provenance, I had updated to use the
> WriteAheadProvenanceRepository implementation, but when I tried to check
> the data provenance, it showed me the following exception message:
> HTTP ERROR 500
>
> Problem accessing /nifi/provenance. Reason:
>
> Server Error
>
> Caused by:
>
> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>   at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
>   at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>   at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>   at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>   at 
> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>   at 
> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>   at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>   at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>   at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread Koji Kawamura
I see, thanks. The easiest way to look at provenance events would be
by right clicking a processor instance you are interested in, then
select 'View data provenance' context menu. This way, NiFi displays
provenance events for the selected processor.

Koji

On Wed, Dec 27, 2017 at 6:17 PM, 尹文才  wrote:
> Hi Koji, sorry about the provenance exception, it was because there's no
> space left on the machine(filled up with logs)
>
> Regards,
> Ben
>
> 2017-12-27 17:11 GMT+08:00 尹文才 :
>
>> Hi Koji, thanks, the names of the temp tables are created with format
>> "MMddHHmmssSSS-", the first time indicates the time and the second
>> part is a random number with length of 4.
>> So I think it's not possible to have 2 duplicate table names, the only
>> possibly I could think is the flowfile is passed into the processor twice.
>>
>> About the provenance, I had updated to use the
>> WriteAheadProvenanceRepository implementation, but when I tried to check
>> the data provenance, it showed me the following exception message:
>> HTTP ERROR 500
>>
>> Problem accessing /nifi/provenance. Reason:
>>
>> Server Error
>>
>> Caused by:
>>
>> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: 
>> java.lang.NullPointerException
>>   at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
>>   at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>   at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>   at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>   at 
>> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>   at 
>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>>   at 
>> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>>   at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>>   at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>>   at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: 
>> java.lang.NullPointerException
>>   at 
>> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>>   at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>>   at 
>> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>>   at 
>> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>>   at 
>> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>>   at 
>> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>>   at 
>> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>>   at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>>   at 
>> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>>   at 
>> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>>   at 
>> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>>   at 
>> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>>   at 
>> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>>   at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>>   at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>>   at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>   at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>   at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>   at 
>> or

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread 尹文才
Hi Koji, I saw it was only showing the 1000 events so I couldn't see the
event when the FlowFile was created.

Regards,
Ben

2017-12-27 17:21 GMT+08:00 Koji Kawamura :

> I see, thanks. The easiest way to look at provenance events would be
> by right clicking a processor instance you are interested in, then
> select 'View data provenance' context menu. This way, NiFi displays
> provenance events for the selected processor.
>
> Koji
>
> On Wed, Dec 27, 2017 at 6:17 PM, 尹文才  wrote:
> > Hi Koji, sorry about the provenance exception, it was because there's no
> > space left on the machine(filled up with logs)
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 17:11 GMT+08:00 尹文才 :
> >
> >> Hi Koji, thanks, the names of the temp tables are created with format
> >> "MMddHHmmssSSS-", the first time indicates the time and the
> second
> >> part is a random number with length of 4.
> >> So I think it's not possible to have 2 duplicate table names, the only
> >> possibly I could think is the flowfile is passed into the processor
> twice.
> >>
> >> About the provenance, I had updated to use the
> >> WriteAheadProvenanceRepository implementation, but when I tried to check
> >> the data provenance, it showed me the following exception message:
> >> HTTP ERROR 500
> >>
> >> Problem accessing /nifi/provenance. Reason:
> >>
> >> Server Error
> >>
> >> Caused by:
> >>
> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1:
> java.lang.NullPointerException
> >>   at org.eclipse.jetty.server.handler.HandlerCollection.
> handle(HandlerCollection.java:138)
> >>   at org.eclipse.jetty.server.handler.gzip.GzipHandler.
> handle(GzipHandler.java:561)
> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> HandlerWrapper.java:132)
> >>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
> >>   at org.eclipse.jetty.server.HttpChannel.handle(
> HttpChannel.java:320)
> >>   at org.eclipse.jetty.server.HttpConnection.onFillable(
> HttpConnection.java:251)
> >>   at org.eclipse.jetty.io.AbstractConnection$
> ReadCallback.succeeded(AbstractConnection.java:279)
> >>   at org.eclipse.jetty.io.FillInterest.fillable(
> FillInterest.java:110)
> >>   at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
> SslConnection.java:258)
> >>   at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
> SslConnection.java:147)
> >>   at org.eclipse.jetty.io.FillInterest.fillable(
> FillInterest.java:110)
> >>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(
> ChannelEndPoint.java:124)
> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
> QueuedThreadPool.java:672)
> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
> QueuedThreadPool.java:590)
> >>   at java.lang.Thread.run(Thread.java:745)
> >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
> java.lang.NullPointerException
> >>   at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(
> ServletHolder.java:596)
> >>   at org.eclipse.jetty.servlet.ServletHolder.initServlet(
> ServletHolder.java:655)
> >>   at org.eclipse.jetty.servlet.ServletHolder.getServlet(
> ServletHolder.java:498)
> >>   at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
> ServletHolder.java:785)
> >>   at org.eclipse.jetty.servlet.ServletHolder.prepare(
> ServletHolder.java:770)
> >>   at org.eclipse.jetty.servlet.ServletHandler.doHandle(
> ServletHandler.java:538)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.handle(
> ScopedHandler.java:143)
> >>   at org.eclipse.jetty.security.SecurityHandler.handle(
> SecurityHandler.java:548)
> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> HandlerWrapper.java:132)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextHandle(ScopedHandler.java:190)
> >>   at org.eclipse.jetty.server.session.SessionHandler.
> doHandle(SessionHandler.java:1593)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextHandle(ScopedHandler.java:188)
> >>   at org.eclipse.jetty.server.handler.ContextHandler.
> doHandle(ContextHandler.java:1239)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextScope(ScopedHandler.java:168)
> >>   at org.eclipse.jetty.servlet.ServletHandler.doScope(
> ServletHandler.java:481)
> >>   at org.eclipse.jetty.server.session.SessionHandler.
> doScope(SessionHandler.java:1562)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextScope(ScopedHandler.java:166)
> >>   at org.eclipse.jetty.server.handler.ContextHandler.
> doScope(ContextHandler.java:1141)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.handle(
> ScopedHandler.java:141)
> >>   at org.eclipse.jetty.server.handler.HandlerCollection.
> handle(HandlerCollection.java:118)
> >>   at org.eclipse.jetty.server.handler.gzip.GzipHandler.
> handle(GzipHandler.java:561)
> >>   at org.eclipse.jetty.server.handler.HandlerWrapper

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread Koji Kawamura
Hi Ben, you can filter events by timestamp as well.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#searching-for-events

On Wed, Dec 27, 2017 at 6:28 PM, 尹文才  wrote:
> Hi Koji, I saw it was only showing the 1000 events so I couldn't see the
> event when the FlowFile was created.
>
> Regards,
> Ben
>
> 2017-12-27 17:21 GMT+08:00 Koji Kawamura :
>
>> I see, thanks. The easiest way to look at provenance events would be
>> by right clicking a processor instance you are interested in, then
>> select 'View data provenance' context menu. This way, NiFi displays
>> provenance events for the selected processor.
>>
>> Koji
>>
>> On Wed, Dec 27, 2017 at 6:17 PM, 尹文才  wrote:
>> > Hi Koji, sorry about the provenance exception, it was because there's no
>> > space left on the machine(filled up with logs)
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-27 17:11 GMT+08:00 尹文才 :
>> >
>> >> Hi Koji, thanks, the names of the temp tables are created with format
>> >> "MMddHHmmssSSS-", the first time indicates the time and the
>> second
>> >> part is a random number with length of 4.
>> >> So I think it's not possible to have 2 duplicate table names, the only
>> >> possibly I could think is the flowfile is passed into the processor
>> twice.
>> >>
>> >> About the provenance, I had updated to use the
>> >> WriteAheadProvenanceRepository implementation, but when I tried to check
>> >> the data provenance, it showed me the following exception message:
>> >> HTTP ERROR 500
>> >>
>> >> Problem accessing /nifi/provenance. Reason:
>> >>
>> >> Server Error
>> >>
>> >> Caused by:
>> >>
>> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1:
>> java.lang.NullPointerException
>> >>   at org.eclipse.jetty.server.handler.HandlerCollection.
>> handle(HandlerCollection.java:138)
>> >>   at org.eclipse.jetty.server.handler.gzip.GzipHandler.
>> handle(GzipHandler.java:561)
>> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>> >>   at org.eclipse.jetty.server.HttpChannel.handle(
>> HttpChannel.java:320)
>> >>   at org.eclipse.jetty.server.HttpConnection.onFillable(
>> HttpConnection.java:251)
>> >>   at org.eclipse.jetty.io.AbstractConnection$
>> ReadCallback.succeeded(AbstractConnection.java:279)
>> >>   at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>   at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
>> SslConnection.java:258)
>> >>   at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
>> SslConnection.java:147)
>> >>   at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(
>> ChannelEndPoint.java:124)
>> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
>> QueuedThreadPool.java:672)
>> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
>> QueuedThreadPool.java:590)
>> >>   at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
>> java.lang.NullPointerException
>> >>   at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(
>> ServletHolder.java:596)
>> >>   at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:655)
>> >>   at org.eclipse.jetty.servlet.ServletHolder.getServlet(
>> ServletHolder.java:498)
>> >>   at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
>> ServletHolder.java:785)
>> >>   at org.eclipse.jetty.servlet.ServletHolder.prepare(
>> ServletHolder.java:770)
>> >>   at org.eclipse.jetty.servlet.ServletHandler.doHandle(
>> ServletHandler.java:538)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:143)
>> >>   at org.eclipse.jetty.security.SecurityHandler.handle(
>> SecurityHandler.java:548)
>> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:190)
>> >>   at org.eclipse.jetty.server.session.SessionHandler.
>> doHandle(SessionHandler.java:1593)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:188)
>> >>   at org.eclipse.jetty.server.handler.ContextHandler.
>> doHandle(ContextHandler.java:1239)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:168)
>> >>   at org.eclipse.jetty.servlet.ServletHandler.doScope(
>> ServletHandler.java:481)
>> >>   at org.eclipse.jetty.server.session.SessionHandler.
>> doScope(SessionHandler.java:1562)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:166)
>> >>   at org.eclipse.jetty.server.handler.ContextHandler.
>> doScope(ContextHandler.java:1141)
>> >>   at org.eclipse.jetty.server.handler.ScopedHandle