Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

The one thing that looks strange in the screenshot is the
ExecuteSqlCommand having FlowFiles queued in its incoming connection.
Those should be transferred to 'failure' relationship.

Following executeSql() method, shouldn't it re-throw the caught exception?


try (Connection con = dbcpService.getConnection()) {
logger.debug("设置autoCommit为false");
con.setAutoCommit(false);

try (Statement stmt = con.createStatement()) {
logger.info("执行sql语句: {}", new Object[]{sql});
stmt.execute(sql);

// 所有sql语句执行在一个transaction内
logger.debug("提交transaction");
con.commit();
} catch (Exception ex) {
logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
con.rollback();
//将exception抛到外层处理
throw ex;
} finally {
logger.debug("重新设置autoCommit为true");
con.setAutoCommit(true);
}
} catch (Exception ex) {
// HERE, the exception is swallowed, that's why the FlowFiles stay in
the incoming connection.
logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
retryOnFail = true;
}

Thanks,
Koji

On Wed, Dec 27, 2017 at 2:38 PM, 尹文才  wrote:
> Hi Koji, no problem. You could check the code of processor WaitBatch at the
> link:
> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>
> I also uploaded a snapshot of part of NiFi flow which includes the
> ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
>
> You mentioned above that FlowFile repository fails checkpointing will
> affect other processors to process same FlowFile again, but as you could
> see from my snapshot image, the ExecuteSqlCommand is the second processor
> and before the WaitBatch processor, even if the FlowFile repository
> checkpointing failure is caused by WaitBatch, could it lead to the
> processors before it to process a FlowFile multiple times? Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 12:36 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> I was referring these two log messages in your previous email.
>> These two messages are both written by ExecuteSqlCommand, it does not
>> mean 'it was executed again'.
>>
>> ```
>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> c.z.nifi.processors.ExecuteSqlCommand
>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> dbo.ods_extractDataDebug;
>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> _id;
>>
>> and it was executed again later:
>>
>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> c.z.nifi.processors.ExecuteSqlCommand
>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句失败:SELECT
>> ```
>>
>> As you written, the case where FlowFile repository fails checkpointing
>> will affect other processors to process same FlowFiles again. However
>> there won't be a simple solution to every processor to rollback its
>> job as different processors do different things. Creating a temp table
>> if not exist seems right approach to me.
>>
>> At the same time, the route cause of getting FlowFile repository
>> failed should be investigated. Is it possible to share WaitBatch code?
>> The reason why ask this is all 'FlowFile Repository failed to update'
>> is related to WaitBatch processor in the log that you shared earlier.
>>
>> Thanks,
>> Koji
>>
>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
>> > Hi Koji, I will print the sql before actually executing it, but I checked
>> > the error log line you mentioned in your reply, this error was thrown by
>> > NiFi from within another processor called WaitBatch.
>> > I didn't find similar errors as the one from the ExecuteSqlCommand
>> > processor, I think it's because only the ExecuteSqlCommand is used to
>> > create temp database tables.
>> > You could check my ExecuteSqlCommand code via the link:
>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>> >
>> > If the error is really caused by FlowFile repository checkpoint failure
>> and
>> > the flowfile was executed twice, I may have to create the temp table only
>> > if doesn't exist, I didn't fix this bug in this way
>> > right away is because I was afraid this fix could cover some other
>> problems.
>> >
>> > Thanks.
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
>> >
>> >> Hi Ben,
>> >>
>> >> The following two log messages are very close in terms of written
>> >> timestamp, but have different log level.
>> >> 2017-12-26 07:00:01,312 INFO
>> 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Hi Koji, no problem. You could check the code of processor WaitBatch at the
link:
https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ

I also uploaded a snapshot of part of NiFi flow which includes the
ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view

You mentioned above that FlowFile repository fails checkpointing will
affect other processors to process same FlowFile again, but as you could
see from my snapshot image, the ExecuteSqlCommand is the second processor
and before the WaitBatch processor, even if the FlowFile repository
checkpointing failure is caused by WaitBatch, could it lead to the
processors before it to process a FlowFile multiple times? Thanks.

Regards,
Ben

2017-12-27 12:36 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> I was referring these two log messages in your previous email.
> These two messages are both written by ExecuteSqlCommand, it does not
> mean 'it was executed again'.
>
> ```
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> ```
>
> As you written, the case where FlowFile repository fails checkpointing
> will affect other processors to process same FlowFiles again. However
> there won't be a simple solution to every processor to rollback its
> job as different processors do different things. Creating a temp table
> if not exist seems right approach to me.
>
> At the same time, the route cause of getting FlowFile repository
> failed should be investigated. Is it possible to share WaitBatch code?
> The reason why ask this is all 'FlowFile Repository failed to update'
> is related to WaitBatch processor in the log that you shared earlier.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
> > Hi Koji, I will print the sql before actually executing it, but I checked
> > the error log line you mentioned in your reply, this error was thrown by
> > NiFi from within another processor called WaitBatch.
> > I didn't find similar errors as the one from the ExecuteSqlCommand
> > processor, I think it's because only the ExecuteSqlCommand is used to
> > create temp database tables.
> > You could check my ExecuteSqlCommand code via the link:
> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
> >
> > If the error is really caused by FlowFile repository checkpoint failure
> and
> > the flowfile was executed twice, I may have to create the temp table only
> > if doesn't exist, I didn't fix this bug in this way
> > right away is because I was afraid this fix could cover some other
> problems.
> >
> > Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
> >
> >> Hi Ben,
> >>
> >> The following two log messages are very close in terms of written
> >> timestamp, but have different log level.
> >> 2017-12-26 07:00:01,312 INFO
> >> 2017-12-26 07:00:01,315 ERROR
> >>
> >> I guess those are logged within a single onTrigger of your
> >> ExecuteSqlCommand custom processor, one is before executing, the other
> >> is when it caught an exception. Just guessing as I don't have access
> >> to the code.
> >>
> >> Does the same issue happen with other processors bundled with Apache
> >> NiFi without your custom processor running?
> >>
> >> If NiFi fails to update/checkpoint FlowFile repository, then the same
> >> FlowFile can be processed again after restarting NiFi.
> >>
> >> Thanks,
> >> Koji
> >>
> >>
> >>
> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> >> > Thanks Koji, I will look into this article about the record model.
> >> >
> >> > By the way, that error I previously mentioned to you occurred again, I
> >> > could see the sql query was executed twice in the log, this time I had
> >> > turned on the verbose NiFi logging, the sql query is as below:
> >> >
> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> >> SELECT
> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >> > dbo.ods_extractDataDebug;
> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >> _id;
> >> >
> >> > and it was executed again later:
> >> >
> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

I was referring these two log messages in your previous email.
These two messages are both written by ExecuteSqlCommand, it does not
mean 'it was executed again'.

```
2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
```

As you written, the case where FlowFile repository fails checkpointing
will affect other processors to process same FlowFiles again. However
there won't be a simple solution to every processor to rollback its
job as different processors do different things. Creating a temp table
if not exist seems right approach to me.

At the same time, the route cause of getting FlowFile repository
failed should be investigated. Is it possible to share WaitBatch code?
The reason why ask this is all 'FlowFile Repository failed to update'
is related to WaitBatch processor in the log that you shared earlier.

Thanks,
Koji

On Wed, Dec 27, 2017 at 1:19 PM, 尹文才  wrote:
> Hi Koji, I will print the sql before actually executing it, but I checked
> the error log line you mentioned in your reply, this error was thrown by
> NiFi from within another processor called WaitBatch.
> I didn't find similar errors as the one from the ExecuteSqlCommand
> processor, I think it's because only the ExecuteSqlCommand is used to
> create temp database tables.
> You could check my ExecuteSqlCommand code via the link:
> https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>
> If the error is really caused by FlowFile repository checkpoint failure and
> the flowfile was executed twice, I may have to create the temp table only
> if doesn't exist, I didn't fix this bug in this way
> right away is because I was afraid this fix could cover some other problems.
>
> Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 11:38 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> The following two log messages are very close in terms of written
>> timestamp, but have different log level.
>> 2017-12-26 07:00:01,312 INFO
>> 2017-12-26 07:00:01,315 ERROR
>>
>> I guess those are logged within a single onTrigger of your
>> ExecuteSqlCommand custom processor, one is before executing, the other
>> is when it caught an exception. Just guessing as I don't have access
>> to the code.
>>
>> Does the same issue happen with other processors bundled with Apache
>> NiFi without your custom processor running?
>>
>> If NiFi fails to update/checkpoint FlowFile repository, then the same
>> FlowFile can be processed again after restarting NiFi.
>>
>> Thanks,
>> Koji
>>
>>
>>
>> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
>> > Thanks Koji, I will look into this article about the record model.
>> >
>> > By the way, that error I previously mentioned to you occurred again, I
>> > could see the sql query was executed twice in the log, this time I had
>> > turned on the verbose NiFi logging, the sql query is as below:
>> >
>> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
>> SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> _id;
>> >
>> > and it was executed again later:
>> >
>> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句失败:SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
>> SQLServerException.java:217)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
>> SQLServerStatement.java:1655)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
>> SQLServerStatement.java:885)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
>> SQLServerStatement.java:778)
>> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
>> > at
>> > 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Hi Koji, I will print the sql before actually executing it, but I checked
the error log line you mentioned in your reply, this error was thrown by
NiFi from within another processor called WaitBatch.
I didn't find similar errors as the one from the ExecuteSqlCommand
processor, I think it's because only the ExecuteSqlCommand is used to
create temp database tables.
You could check my ExecuteSqlCommand code via the link:
https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P

If the error is really caused by FlowFile repository checkpoint failure and
the flowfile was executed twice, I may have to create the temp table only
if doesn't exist, I didn't fix this bug in this way
right away is because I was afraid this fix could cover some other problems.

Thanks.

Regards,
Ben

2017-12-27 11:38 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> The following two log messages are very close in terms of written
> timestamp, but have different log level.
> 2017-12-26 07:00:01,312 INFO
> 2017-12-26 07:00:01,315 ERROR
>
> I guess those are logged within a single onTrigger of your
> ExecuteSqlCommand custom processor, one is before executing, the other
> is when it caught an exception. Just guessing as I don't have access
> to the code.
>
> Does the same issue happen with other processors bundled with Apache
> NiFi without your custom processor running?
>
> If NiFi fails to update/checkpoint FlowFile repository, then the same
> FlowFile can be processed again after restarting NiFi.
>
> Thanks,
> Koji
>
>
>
> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> > Thanks Koji, I will look into this article about the record model.
> >
> > By the way, that error I previously mentioned to you occurred again, I
> > could see the sql query was executed twice in the log, this time I had
> > turned on the verbose NiFi logging, the sql query is as below:
> >
> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
> >
> > and it was executed again later:
> >
> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
> SQLServerException.java:217)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> SQLServerStatement.java:1655)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
> SQLServerStatement.java:885)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
> SQLServerStatement.java:778)
> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> SQLServerConnection.java:2445)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> SQLServerStatement.java:191)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> SQLServerStatement.java:166)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> SQLServerStatement.java:751)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> executeSql(ExecuteSqlCommand.java:194)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> onTrigger(ExecuteSqlCommand.java:164)
> > at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> > at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1119)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> > at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:128)
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > at
> > 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

The following two log messages are very close in terms of written
timestamp, but have different log level.
2017-12-26 07:00:01,312 INFO
2017-12-26 07:00:01,315 ERROR

I guess those are logged within a single onTrigger of your
ExecuteSqlCommand custom processor, one is before executing, the other
is when it caught an exception. Just guessing as I don't have access
to the code.

Does the same issue happen with other processors bundled with Apache
NiFi without your custom processor running?

If NiFi fails to update/checkpoint FlowFile repository, then the same
FlowFile can be processed again after restarting NiFi.

Thanks,
Koji



On Wed, Dec 27, 2017 at 12:21 PM, 尹文才  wrote:
> Thanks Koji, I will look into this article about the record model.
>
> By the way, that error I previously mentioned to you occurred again, I
> could see the sql query was executed twice in the log, this time I had
> turned on the verbose NiFi logging, the sql query is as below:
>
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
> at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> at
> com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I also saw a lot of NiFi's exception like "ProcessException: FlowFile
> Repository failed to update", not sure if this is the reason the FlowFile
> got processed twice.  Could you help to take a look at my log file? Thanks.
> You could get the log file via the link:
> https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view
>
> Best Regards,
> Ben
>
> 2017-12-27 10:00 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> This blog post written by Mark, would be a good starting point to get
>> familiar with NiFi Record model.
>> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>>
>> HA 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Thanks Koji, I will look into this article about the record model.

By the way, that error I previously mentioned to you occurred again, I
could see the sql query was executed twice in the log, this time I had
turned on the verbose NiFi logging, the sql query is as below:

2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
_id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I also saw a lot of NiFi's exception like "ProcessException: FlowFile
Repository failed to update", not sure if this is the reason the FlowFile
got processed twice.  Could you help to take a look at my log file? Thanks.
You could get the log file via the link:
https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view

Best Regards,
Ben

2017-12-27 10:00 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> This blog post written by Mark, would be a good starting point to get
> familiar with NiFi Record model.
> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>
> HA for DistributedMapCacheClientService and DistributedMapCacheServer
> pair is not supported at the moment. If you need HighAvailability,
> RedisDistributedMapCacheClientService with Redis replication will
> provide that, I haven't tried that myself though.
> https://redis.io/topics/replication
>
> Thanks,
> Koji
>
> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才  wrote:
> > Thanks for your quick response, Koji, I haven't heard and seen anything
> > about the NiFi record data model when I was reading the NiFi
> > documentations,could you tell me where this model is documented? Thanks.
> >
> > By the way, to my knowledge, when you need to use the
> DistributedMapCacheServer
> > from DistributedMapCacheClientService, you need to specify the host url
> for
> > the server, this means inside a NiFi cluster
> > when I specify the cache server and the node 

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

This blog post written by Mark, would be a good starting point to get
familiar with NiFi Record model.
https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

HA for DistributedMapCacheClientService and DistributedMapCacheServer
pair is not supported at the moment. If you need HighAvailability,
RedisDistributedMapCacheClientService with Redis replication will
provide that, I haven't tried that myself though.
https://redis.io/topics/replication

Thanks,
Koji

On Tue, Dec 26, 2017 at 7:58 PM, 尹文才  wrote:
> Thanks for your quick response, Koji, I haven't heard and seen anything
> about the NiFi record data model when I was reading the NiFi
> documentations,could you tell me where this model is documented? Thanks.
>
> By the way, to my knowledge, when you need to use the 
> DistributedMapCacheServer
> from DistributedMapCacheClientService, you need to specify the host url for
> the server, this means inside a NiFi cluster
> when I specify the cache server and the node suddenly went down, I couldn't
> possibly use it until the node goes up again right? Is there currently such
> a cache server in NiFi that could support HA? Thanks.
>
> Regards,
> Ben
>
> 2017-12-26 18:34 GMT+08:00 Koji Kawamura :
>
>> Hi Ben,
>>
>> As you found from existing code, DistributedMapCache is used to share
>> state among different processors, and it can be used by your custom
>> processors, too.
>> However, I'd recommend to avoid such tight dependencies between
>> FlowFiles if possible, or minimize the part in flow that requires that
>> constraint at least for better performance and simplicity.
>> For example, since a FlowFile can hold fairly large amount of data,
>> you could merge all FlowFiles in a single FlowFile, instead of batches
>> of FlowFiles. If you need logical boundaries, you can use NiFi Record
>> data model to embed multiple records within a FlowFile, Record should
>> perform better.
>>
>> Hope this helps.
>>
>> Thanks,
>> Koji
>>
>>
>> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
>> > Hi guys, I'm currently trying to find a proper way in nifi which could
>> sync
>> > status between my custom processors.
>> > our requirement is like this, we're doing some ETL work using nifi and
>> I'm
>> > extracting the data from DB into batches of FlowFiles(each batch of
>> > FlowFile has a flag FlowFile indicating the end of the batch).
>> > There're some groups of custom processors downstream that need to process
>> > these FlowFiles to do some business logic work. And we expect these
>> > processors to process one batch of FlowFiles at a time.
>> > Therefore we need to implement a custom Wait processor(let's just call it
>> > WaitBatch here) to hold all the other batches of FlowFiles while the
>> > business processors were handling the batch of FlowFiles whose creation
>> > time is earlier.
>> >
>> > In order to implement this, all the WaitBatch processors placed in the
>> flow
>> > need to read/update records in a shared map so that each set of
>> > business-logic processors process one batch at a time.
>> > The entries are keyed using the batch number of the FlowFiles and the
>> value
>> > of each entry is a batch release counter number which counts the number
>> of
>> > times the batch of FlowFiles has passed through
>> > a WaitBatch processor.
>> > When a batch is released by WaitBatch, it will try to increment the batch
>> > number entry's value by 1 and then the released batch number and counter
>> > number will also be saved locally at the WaitBatch with StateManager;
>> > when the next batch reaches the WaitBatch, it will check if the counter
>> > value of the previous released batch number in the shared map is greater
>> > than the one saved locally, if the entry for the batch number does't
>> > exist(already removed) or the value in the shared map is greater, the
>> next
>> > batch will be released and the local state and the entry on the shared
>> map
>> > will be updated similarly.
>> > In the end of the flow, a custom processor will get the batch number from
>> > each batch and remove the entry from the shared map .
>> >
>> > So this implementation requires a shared map that could read/update
>> > frequently and atomically. I checked the Wait/Notify processors in NIFI
>> and
>> > saw it is using the DistributedMapCacheClientService and
>> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
>> > the DistributedMapCacheClientService to implement my logic. I also saw
>> > another implementation called RedisDistributedMapCacheClientService
>> > which seems to require Redis(I haven't used Redis).  Thanks in advance
>> for
>> > any suggestions.
>> >
>> > Regards,
>> > Ben
>>


Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Joe Witt
Steven

Understood.  That it now shows you this is a good thing in 1.4.0
because what was happening in the older versions is one of the sets of
processors was being loaded.  You are probably seeing a warning in the
app log at startup.  So depending on which nar is loaded first
(standard or your custom one) the code will actually run out of there.
So, in short the behavior in 1.4.0 even in your current setup is
probably better than the behavior prior.

Hope that helps

Thanks
Joe

On Tue, Dec 26, 2017 at 2:00 PM, Byers, Steven K (Steve) CTR USARMY
MEDCOM JMLFDC (US)  wrote:
> Yes, I agree. Unfortunately, our schedule may not allow for that at this time 
> so we will probably have to put off migrating for now.
>
> Thanks to you and Bryan for your replies.
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development Center 
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT) Directorate/ 
> Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> -Original Message-
> From: Joe Witt [mailto:joe.w...@gmail.com]
> Sent: Tuesday, December 26, 2017 1:56 PM
> To: dev@nifi.apache.org
> Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>
> Steven
>
> To Bryan's point if it will be best if you simply copy the code/base classes 
> over to your Nar rather than extending from existing concrete implemented 
> processors.
>
> Thanks
>
> On Tue, Dec 26, 2017 at 1:50 PM, Byers, Steven K (Steve) CTR USARMY MEDCOM 
> JMLFDC (US)  wrote:
>> Bryan,
>>
>> We have extended a few processors from standard processors.  For
>> example, one of the processors we are extending is:
>> org.apache.nifi.processors.standard.JmsConsumer
>>
>> In another processor we are importing 
>> org.apache.nifi.processors.standard.util.JdbcCommon.
>>
>> I tried making the dependency on standard processors optional 
>> (optional>true) which excludes the standard processors but then 
>> NiFi won't start because those custom processors that depend on it cannot be 
>> instantiated.
>>
>>
>>
>> Thank you,
>>
>> Steven K. Byers
>> DXC Technology - Contractor
>> Software Developer - Joint Medical Logistics Functional Development
>> Center (JMLFDC) Defense Health Agency (DHA)/ Health Information
>> Technology (HIT) Directorate/ Solution Delivery Division
>> (SDD)/Clinical Support Branch/JMLFDC
>> 1681 Nelson Street, Fort Detrick, MD  21702
>> (443) 538-7575 | (410) 872-4923
>> Email: steven.k.byers@mail.mil
>>
>>
>>
>> -Original Message-
>> From: Bryan Bende [mailto:bbe...@gmail.com]
>> Sent: Tuesday, December 26, 2017 1:34 PM
>> To: dev@nifi.apache.org
>> Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>>
>> Can you give some more details about what the dependency is for?
>>
>> If it is some utility code that exists in standard processors then we
>> should be looking to move that to other reusable modules so that you
>> can depend on it without depending on the processors.
>>
>> If it is because you extended a processor from standard processors,
>> you would probably want to just copy the processor code into your own
>> NAR and modify/extend it.
>>
>> On Tue, Dec 26, 2017 at 1:26 PM Byers, Steven K (Steve) CTR USARMY
>> MEDCOM JMLFDC (US)  wrote:
>>
>>> Thanks for the reply, Bryan,
>>>
>>> Yes, two of our custom processors have a dependency on the standard
>>> processors.  The dependency cannot be removed or those processors
>>> will not compile.
>>>
>>> Thank you,
>>>
>>> Steven K. Byers
>>> DXC Technology - Contractor
>>> Software Developer - Joint Medical Logistics Functional Development
>>> Center
>>> (JMLFDC)
>>> Defense Health Agency (DHA)/ Health Information Technology (HIT)
>>> Directorate/ Solution Delivery Division (SDD)/Clinical Support
>>> Branch/JMLFDC
>>> 1681 Nelson Street, Fort Detrick, MD  21702
>>> (443) 538-7575 | (410) 872-4923
>>> Email: steven.k.byers@mail.mil
>>>
>>>
>>>
>>> -Original Message-
>>> From: Bryan Bende [mailto:bbe...@gmail.com]
>>> Sent: Tuesday, December 26, 2017 11:25 AM
>>> To: dev@nifi.apache.org
>>> Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>>>
>>> Hello,
>>>
>>> This means your custom NAR is bundling the standard processors jar
>>> and as a result they are getting discovered twice, once from your NAR
>>> and once from the standard NAR.
>>>
>>> You’ll have to look at your maven dependencies for your custom NARs
>>> and figure out why the dependency on standard processors exists and remove 
>>> it.
>>>
>>> Thanks,
>>>
>>> Bryan
>>>
>>>
>>> On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY
>>> MEDCOM JMLFDC (US)  wrote:
>>>
>>> > Hi devs,
>>> >
>>> > I'm 

RE: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Byers, Steven K (Steve) CTR USARMY MEDCOM JMLFDC (US)
Yes, I agree. Unfortunately, our schedule may not allow for that at this time 
so we will probably have to put off migrating for now.

Thanks to you and Bryan for your replies.

Thank you,

Steven K. Byers 
DXC Technology - Contractor
Software Developer - Joint Medical Logistics Functional Development Center 
(JMLFDC)
Defense Health Agency (DHA)/ Health Information Technology (HIT) Directorate/ 
Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
1681 Nelson Street, Fort Detrick, MD  21702 
(443) 538-7575 | (410) 872-4923
Email: steven.k.byers@mail.mil



-Original Message-
From: Joe Witt [mailto:joe.w...@gmail.com] 
Sent: Tuesday, December 26, 2017 1:56 PM
To: dev@nifi.apache.org
Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

Steven

To Bryan's point if it will be best if you simply copy the code/base classes 
over to your Nar rather than extending from existing concrete implemented 
processors.

Thanks

On Tue, Dec 26, 2017 at 1:50 PM, Byers, Steven K (Steve) CTR USARMY MEDCOM 
JMLFDC (US)  wrote:
> Bryan,
>
> We have extended a few processors from standard processors.  For 
> example, one of the processors we are extending is:  
> org.apache.nifi.processors.standard.JmsConsumer
>
> In another processor we are importing 
> org.apache.nifi.processors.standard.util.JdbcCommon.
>
> I tried making the dependency on standard processors optional 
> (optional>true) which excludes the standard processors but then 
> NiFi won't start because those custom processors that depend on it cannot be 
> instantiated.
>
>
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development 
> Center (JMLFDC) Defense Health Agency (DHA)/ Health Information 
> Technology (HIT) Directorate/ Solution Delivery Division 
> (SDD)/Clinical Support Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> -Original Message-
> From: Bryan Bende [mailto:bbe...@gmail.com]
> Sent: Tuesday, December 26, 2017 1:34 PM
> To: dev@nifi.apache.org
> Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>
> Can you give some more details about what the dependency is for?
>
> If it is some utility code that exists in standard processors then we 
> should be looking to move that to other reusable modules so that you 
> can depend on it without depending on the processors.
>
> If it is because you extended a processor from standard processors, 
> you would probably want to just copy the processor code into your own 
> NAR and modify/extend it.
>
> On Tue, Dec 26, 2017 at 1:26 PM Byers, Steven K (Steve) CTR USARMY 
> MEDCOM JMLFDC (US)  wrote:
>
>> Thanks for the reply, Bryan,
>>
>> Yes, two of our custom processors have a dependency on the standard 
>> processors.  The dependency cannot be removed or those processors 
>> will not compile.
>>
>> Thank you,
>>
>> Steven K. Byers
>> DXC Technology - Contractor
>> Software Developer - Joint Medical Logistics Functional Development 
>> Center
>> (JMLFDC)
>> Defense Health Agency (DHA)/ Health Information Technology (HIT) 
>> Directorate/ Solution Delivery Division (SDD)/Clinical Support 
>> Branch/JMLFDC
>> 1681 Nelson Street, Fort Detrick, MD  21702
>> (443) 538-7575 | (410) 872-4923
>> Email: steven.k.byers@mail.mil
>>
>>
>>
>> -Original Message-
>> From: Bryan Bende [mailto:bbe...@gmail.com]
>> Sent: Tuesday, December 26, 2017 11:25 AM
>> To: dev@nifi.apache.org
>> Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>>
>> Hello,
>>
>> This means your custom NAR is bundling the standard processors jar 
>> and as a result they are getting discovered twice, once from your NAR 
>> and once from the standard NAR.
>>
>> You’ll have to look at your maven dependencies for your custom NARs 
>> and figure out why the dependency on standard processors exists and remove 
>> it.
>>
>> Thanks,
>>
>> Bryan
>>
>>
>> On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY 
>> MEDCOM JMLFDC (US)  wrote:
>>
>> > Hi devs,
>> >
>> > I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several 
>> > custom processors and services.  Everything is compiling without 
>> > any problems but when I put the services into the 1.4.0 instance, 
>> > NiFi shows in the list of processors a 1.1.2 and 1.4.0 version of 
>> > all processors including the stock NiFi processors. If I just load 
>> > our custom processors that do not require a service, NiFi is fine 
>> > and the processor list looks like it should and the custom 
>> > processors work fine.  It seems to be something related to the 
>> > custom services. Is there some documentation or any guidance 
>> > someone can provide to assist with what I am doing?
>> >
>> > Thank you,
>> >
>> > Steven K. Byers
>> > DXC Technology 

Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Joe Witt
Steven

To Bryan's point if it will be best if you simply copy the code/base
classes over to your Nar rather than extending from existing concrete
implemented processors.

Thanks

On Tue, Dec 26, 2017 at 1:50 PM, Byers, Steven K (Steve) CTR USARMY
MEDCOM JMLFDC (US)  wrote:
> Bryan,
>
> We have extended a few processors from standard processors.  For example, one 
> of the processors we are extending is:  
> org.apache.nifi.processors.standard.JmsConsumer
>
> In another processor we are importing 
> org.apache.nifi.processors.standard.util.JdbcCommon.
>
> I tried making the dependency on standard processors optional 
> (optional>true) which excludes the standard processors but then 
> NiFi won't start because those custom processors that depend on it cannot be 
> instantiated.
>
>
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development Center 
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT) Directorate/ 
> Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> -Original Message-
> From: Bryan Bende [mailto:bbe...@gmail.com]
> Sent: Tuesday, December 26, 2017 1:34 PM
> To: dev@nifi.apache.org
> Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>
> Can you give some more details about what the dependency is for?
>
> If it is some utility code that exists in standard processors then we
> should be looking to move that to other reusable modules so that you can
> depend on it without depending on the processors.
>
> If it is because you extended a processor from standard processors, you
> would probably want to just copy the processor code into your own NAR and
> modify/extend it.
>
> On Tue, Dec 26, 2017 at 1:26 PM Byers, Steven K (Steve) CTR USARMY MEDCOM
> JMLFDC (US)  wrote:
>
>> Thanks for the reply, Bryan,
>>
>> Yes, two of our custom processors have a dependency on the standard
>> processors.  The dependency cannot be removed or those processors will not
>> compile.
>>
>> Thank you,
>>
>> Steven K. Byers
>> DXC Technology - Contractor
>> Software Developer - Joint Medical Logistics Functional Development Center
>> (JMLFDC)
>> Defense Health Agency (DHA)/ Health Information Technology (HIT)
>> Directorate/ Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
>> 1681 Nelson Street, Fort Detrick, MD  21702
>> (443) 538-7575 | (410) 872-4923
>> Email: steven.k.byers@mail.mil
>>
>>
>>
>> -Original Message-
>> From: Bryan Bende [mailto:bbe...@gmail.com]
>> Sent: Tuesday, December 26, 2017 11:25 AM
>> To: dev@nifi.apache.org
>> Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>>
>> Hello,
>>
>> This means your custom NAR is bundling the standard processors jar and as
>> a result they are getting discovered twice, once from your NAR and once
>> from the standard NAR.
>>
>> You’ll have to look at your maven dependencies for your custom NARs and
>> figure out why the dependency on standard processors exists and remove it.
>>
>> Thanks,
>>
>> Bryan
>>
>>
>> On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY MEDCOM
>> JMLFDC (US)  wrote:
>>
>> > Hi devs,
>> >
>> > I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several
>> > custom processors and services.  Everything is compiling without any
>> > problems but when I put the services into the 1.4.0 instance, NiFi
>> > shows in the list of processors a 1.1.2 and 1.4.0 version of all
>> > processors including the stock NiFi processors. If I just load our
>> > custom processors that do not require a service, NiFi is fine and the
>> > processor list looks like it should and the custom processors work
>> > fine.  It seems to be something related to the custom services. Is
>> > there some documentation or any guidance someone can provide to assist
>> > with what I am doing?
>> >
>> > Thank you,
>> >
>> > Steven K. Byers
>> > DXC Technology - Contractor
>> > Software Developer - Joint Medical Logistics Functional Development
>> > Center
>> > (JMLFDC)
>> > Defense Health Agency (DHA)/ Health Information Technology (HIT)
>> > Directorate/ Solution Delivery Division (SDD)/Clinical Support
>> > Branch/JMLFDC
>> > 1681 Nelson Street, Fort Detrick, MD  21702
>> > (443) 538-7575 | (410) 872-4923
>> > Email: steven.k.byers@mail.mil
>> >
>> >
>> >
>> > --
>> Sent from Gmail Mobile
>>
> --
> Sent from Gmail Mobile


RE: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Byers, Steven K (Steve) CTR USARMY MEDCOM JMLFDC (US)
Bryan,

We have extended a few processors from standard processors.  For example, one 
of the processors we are extending is:  
org.apache.nifi.processors.standard.JmsConsumer 

In another processor we are importing 
org.apache.nifi.processors.standard.util.JdbcCommon.

I tried making the dependency on standard processors optional 
(optional>true) which excludes the standard processors but then NiFi 
won't start because those custom processors that depend on it cannot be 
instantiated.



Thank you,

Steven K. Byers 
DXC Technology - Contractor
Software Developer - Joint Medical Logistics Functional Development Center 
(JMLFDC)
Defense Health Agency (DHA)/ Health Information Technology (HIT) Directorate/ 
Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
1681 Nelson Street, Fort Detrick, MD  21702 
(443) 538-7575 | (410) 872-4923
Email: steven.k.byers@mail.mil



-Original Message-
From: Bryan Bende [mailto:bbe...@gmail.com] 
Sent: Tuesday, December 26, 2017 1:34 PM
To: dev@nifi.apache.org
Subject: Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

Can you give some more details about what the dependency is for?

If it is some utility code that exists in standard processors then we
should be looking to move that to other reusable modules so that you can
depend on it without depending on the processors.

If it is because you extended a processor from standard processors, you
would probably want to just copy the processor code into your own NAR and
modify/extend it.

On Tue, Dec 26, 2017 at 1:26 PM Byers, Steven K (Steve) CTR USARMY MEDCOM
JMLFDC (US)  wrote:

> Thanks for the reply, Bryan,
>
> Yes, two of our custom processors have a dependency on the standard
> processors.  The dependency cannot be removed or those processors will not
> compile.
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development Center
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT)
> Directorate/ Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> -Original Message-
> From: Bryan Bende [mailto:bbe...@gmail.com]
> Sent: Tuesday, December 26, 2017 11:25 AM
> To: dev@nifi.apache.org
> Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>
> Hello,
>
> This means your custom NAR is bundling the standard processors jar and as
> a result they are getting discovered twice, once from your NAR and once
> from the standard NAR.
>
> You’ll have to look at your maven dependencies for your custom NARs and
> figure out why the dependency on standard processors exists and remove it.
>
> Thanks,
>
> Bryan
>
>
> On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY MEDCOM
> JMLFDC (US)  wrote:
>
> > Hi devs,
> >
> > I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several
> > custom processors and services.  Everything is compiling without any
> > problems but when I put the services into the 1.4.0 instance, NiFi
> > shows in the list of processors a 1.1.2 and 1.4.0 version of all
> > processors including the stock NiFi processors. If I just load our
> > custom processors that do not require a service, NiFi is fine and the
> > processor list looks like it should and the custom processors work
> > fine.  It seems to be something related to the custom services. Is
> > there some documentation or any guidance someone can provide to assist
> > with what I am doing?
> >
> > Thank you,
> >
> > Steven K. Byers
> > DXC Technology - Contractor
> > Software Developer - Joint Medical Logistics Functional Development
> > Center
> > (JMLFDC)
> > Defense Health Agency (DHA)/ Health Information Technology (HIT)
> > Directorate/ Solution Delivery Division (SDD)/Clinical Support
> > Branch/JMLFDC
> > 1681 Nelson Street, Fort Detrick, MD  21702
> > (443) 538-7575 | (410) 872-4923
> > Email: steven.k.byers@mail.mil
> >
> >
> >
> > --
> Sent from Gmail Mobile
>
-- 
Sent from Gmail Mobile


smime.p7s
Description: S/MIME cryptographic signature


Re: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Bryan Bende
Can you give some more details about what the dependency is for?

If it is some utility code that exists in standard processors then we
should be looking to move that to other reusable modules so that you can
depend on it without depending on the processors.

If it is because you extended a processor from standard processors, you
would probably want to just copy the processor code into your own NAR and
modify/extend it.

On Tue, Dec 26, 2017 at 1:26 PM Byers, Steven K (Steve) CTR USARMY MEDCOM
JMLFDC (US)  wrote:

> Thanks for the reply, Bryan,
>
> Yes, two of our custom processors have a dependency on the standard
> processors.  The dependency cannot be removed or those processors will not
> compile.
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development Center
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT)
> Directorate/ Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> -Original Message-
> From: Bryan Bende [mailto:bbe...@gmail.com]
> Sent: Tuesday, December 26, 2017 11:25 AM
> To: dev@nifi.apache.org
> Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0
>
> Hello,
>
> This means your custom NAR is bundling the standard processors jar and as
> a result they are getting discovered twice, once from your NAR and once
> from the standard NAR.
>
> You’ll have to look at your maven dependencies for your custom NARs and
> figure out why the dependency on standard processors exists and remove it.
>
> Thanks,
>
> Bryan
>
>
> On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY MEDCOM
> JMLFDC (US)  wrote:
>
> > Hi devs,
> >
> > I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several
> > custom processors and services.  Everything is compiling without any
> > problems but when I put the services into the 1.4.0 instance, NiFi
> > shows in the list of processors a 1.1.2 and 1.4.0 version of all
> > processors including the stock NiFi processors. If I just load our
> > custom processors that do not require a service, NiFi is fine and the
> > processor list looks like it should and the custom processors work
> > fine.  It seems to be something related to the custom services. Is
> > there some documentation or any guidance someone can provide to assist
> > with what I am doing?
> >
> > Thank you,
> >
> > Steven K. Byers
> > DXC Technology - Contractor
> > Software Developer - Joint Medical Logistics Functional Development
> > Center
> > (JMLFDC)
> > Defense Health Agency (DHA)/ Health Information Technology (HIT)
> > Directorate/ Solution Delivery Division (SDD)/Clinical Support
> > Branch/JMLFDC
> > 1681 Nelson Street, Fort Detrick, MD  21702
> > (443) 538-7575 | (410) 872-4923
> > Email: steven.k.byers@mail.mil
> >
> >
> >
> > --
> Sent from Gmail Mobile
>
-- 
Sent from Gmail Mobile


RE: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Byers, Steven K (Steve) CTR USARMY MEDCOM JMLFDC (US)
Thanks for the reply, Bryan,

Yes, two of our custom processors have a dependency on the standard processors. 
 The dependency cannot be removed or those processors will not compile.

Thank you,

Steven K. Byers 
DXC Technology - Contractor
Software Developer - Joint Medical Logistics Functional Development Center 
(JMLFDC)
Defense Health Agency (DHA)/ Health Information Technology (HIT) Directorate/ 
Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
1681 Nelson Street, Fort Detrick, MD  21702 
(443) 538-7575 | (410) 872-4923
Email: steven.k.byers@mail.mil



-Original Message-
From: Bryan Bende [mailto:bbe...@gmail.com] 
Sent: Tuesday, December 26, 2017 11:25 AM
To: dev@nifi.apache.org
Subject: [Non-DoD Source] Re: Moving from version 1.1.2 to 1.4.0

Hello,

This means your custom NAR is bundling the standard processors jar and as a 
result they are getting discovered twice, once from your NAR and once from the 
standard NAR.

You’ll have to look at your maven dependencies for your custom NARs and figure 
out why the dependency on standard processors exists and remove it.

Thanks,

Bryan


On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY MEDCOM 
JMLFDC (US)  wrote:

> Hi devs,
>
> I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several 
> custom processors and services.  Everything is compiling without any 
> problems but when I put the services into the 1.4.0 instance, NiFi 
> shows in the list of processors a 1.1.2 and 1.4.0 version of all 
> processors including the stock NiFi processors. If I just load our 
> custom processors that do not require a service, NiFi is fine and the 
> processor list looks like it should and the custom processors work 
> fine.  It seems to be something related to the custom services. Is 
> there some documentation or any guidance someone can provide to assist 
> with what I am doing?
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development 
> Center
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT) 
> Directorate/ Solution Delivery Division (SDD)/Clinical Support 
> Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> --
Sent from Gmail Mobile


smime.p7s
Description: S/MIME cryptographic signature


Re: Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Bryan Bende
Hello,

This means your custom NAR is bundling the standard processors jar and as a
result they are getting discovered twice, once from your NAR and once from
the standard NAR.

You’ll have to look at your maven dependencies for your custom NARs and
figure out why the dependency on standard processors exists and remove it.

Thanks,

Bryan


On Tue, Dec 26, 2017 at 11:09 AM Byers, Steven K (Steve) CTR USARMY MEDCOM
JMLFDC (US)  wrote:

> Hi devs,
>
> I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several custom
> processors and services.  Everything is compiling without any problems but
> when I put the services into the 1.4.0 instance, NiFi shows in the list of
> processors a 1.1.2 and 1.4.0 version of all processors including the stock
> NiFi processors. If I just load our custom processors that do not require a
> service, NiFi is fine and the processor list looks like it should and the
> custom processors work fine.  It seems to be something related to the
> custom
> services. Is there some documentation or any guidance someone can provide
> to
> assist with what I am doing?
>
> Thank you,
>
> Steven K. Byers
> DXC Technology - Contractor
> Software Developer - Joint Medical Logistics Functional Development Center
> (JMLFDC)
> Defense Health Agency (DHA)/ Health Information Technology (HIT)
> Directorate/ Solution Delivery Division (SDD)/Clinical Support
> Branch/JMLFDC
> 1681 Nelson Street, Fort Detrick, MD  21702
> (443) 538-7575 | (410) 872-4923
> Email: steven.k.byers@mail.mil
>
>
>
> --
Sent from Gmail Mobile


Moving from version 1.1.2 to 1.4.0

2017-12-26 Thread Byers, Steven K (Steve) CTR USARMY MEDCOM JMLFDC (US)
Hi devs,

I'm looking into moving from NiFi 1.1.2 to 1.4.0.  We have several custom
processors and services.  Everything is compiling without any problems but
when I put the services into the 1.4.0 instance, NiFi shows in the list of
processors a 1.1.2 and 1.4.0 version of all processors including the stock
NiFi processors. If I just load our custom processors that do not require a
service, NiFi is fine and the processor list looks like it should and the
custom processors work fine.  It seems to be something related to the custom
services. Is there some documentation or any guidance someone can provide to
assist with what I am doing?

Thank you,

Steven K. Byers 
DXC Technology - Contractor
Software Developer - Joint Medical Logistics Functional Development Center
(JMLFDC)
Defense Health Agency (DHA)/ Health Information Technology (HIT)
Directorate/ Solution Delivery Division (SDD)/Clinical Support Branch/JMLFDC
1681 Nelson Street, Fort Detrick, MD  21702 
(443) 538-7575 | (410) 872-4923
Email: steven.k.byers@mail.mil





smime.p7s
Description: S/MIME cryptographic signature


Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Thanks for your quick response, Koji, I haven't heard and seen anything
about the NiFi record data model when I was reading the NiFi
documentations,could you tell me where this model is documented? Thanks.

By the way, to my knowledge, when you need to use the DistributedMapCacheServer
from DistributedMapCacheClientService, you need to specify the host url for
the server, this means inside a NiFi cluster
when I specify the cache server and the node suddenly went down, I couldn't
possibly use it until the node goes up again right? Is there currently such
a cache server in NiFi that could support HA? Thanks.

Regards,
Ben

2017-12-26 18:34 GMT+08:00 Koji Kawamura :

> Hi Ben,
>
> As you found from existing code, DistributedMapCache is used to share
> state among different processors, and it can be used by your custom
> processors, too.
> However, I'd recommend to avoid such tight dependencies between
> FlowFiles if possible, or minimize the part in flow that requires that
> constraint at least for better performance and simplicity.
> For example, since a FlowFile can hold fairly large amount of data,
> you could merge all FlowFiles in a single FlowFile, instead of batches
> of FlowFiles. If you need logical boundaries, you can use NiFi Record
> data model to embed multiple records within a FlowFile, Record should
> perform better.
>
> Hope this helps.
>
> Thanks,
> Koji
>
>
> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
> > Hi guys, I'm currently trying to find a proper way in nifi which could
> sync
> > status between my custom processors.
> > our requirement is like this, we're doing some ETL work using nifi and
> I'm
> > extracting the data from DB into batches of FlowFiles(each batch of
> > FlowFile has a flag FlowFile indicating the end of the batch).
> > There're some groups of custom processors downstream that need to process
> > these FlowFiles to do some business logic work. And we expect these
> > processors to process one batch of FlowFiles at a time.
> > Therefore we need to implement a custom Wait processor(let's just call it
> > WaitBatch here) to hold all the other batches of FlowFiles while the
> > business processors were handling the batch of FlowFiles whose creation
> > time is earlier.
> >
> > In order to implement this, all the WaitBatch processors placed in the
> flow
> > need to read/update records in a shared map so that each set of
> > business-logic processors process one batch at a time.
> > The entries are keyed using the batch number of the FlowFiles and the
> value
> > of each entry is a batch release counter number which counts the number
> of
> > times the batch of FlowFiles has passed through
> > a WaitBatch processor.
> > When a batch is released by WaitBatch, it will try to increment the batch
> > number entry's value by 1 and then the released batch number and counter
> > number will also be saved locally at the WaitBatch with StateManager;
> > when the next batch reaches the WaitBatch, it will check if the counter
> > value of the previous released batch number in the shared map is greater
> > than the one saved locally, if the entry for the batch number does't
> > exist(already removed) or the value in the shared map is greater, the
> next
> > batch will be released and the local state and the entry on the shared
> map
> > will be updated similarly.
> > In the end of the flow, a custom processor will get the batch number from
> > each batch and remove the entry from the shared map .
> >
> > So this implementation requires a shared map that could read/update
> > frequently and atomically. I checked the Wait/Notify processors in NIFI
> and
> > saw it is using the DistributedMapCacheClientService and
> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
> > the DistributedMapCacheClientService to implement my logic. I also saw
> > another implementation called RedisDistributedMapCacheClientService
> > which seems to require Redis(I haven't used Redis).  Thanks in advance
> for
> > any suggestions.
> >
> > Regards,
> > Ben
>


Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread Koji Kawamura
Hi Ben,

As you found from existing code, DistributedMapCache is used to share
state among different processors, and it can be used by your custom
processors, too.
However, I'd recommend to avoid such tight dependencies between
FlowFiles if possible, or minimize the part in flow that requires that
constraint at least for better performance and simplicity.
For example, since a FlowFile can hold fairly large amount of data,
you could merge all FlowFiles in a single FlowFile, instead of batches
of FlowFiles. If you need logical boundaries, you can use NiFi Record
data model to embed multiple records within a FlowFile, Record should
perform better.

Hope this helps.

Thanks,
Koji


On Tue, Dec 26, 2017 at 5:55 PM, 尹文才  wrote:
> Hi guys, I'm currently trying to find a proper way in nifi which could sync
> status between my custom processors.
> our requirement is like this, we're doing some ETL work using nifi and I'm
> extracting the data from DB into batches of FlowFiles(each batch of
> FlowFile has a flag FlowFile indicating the end of the batch).
> There're some groups of custom processors downstream that need to process
> these FlowFiles to do some business logic work. And we expect these
> processors to process one batch of FlowFiles at a time.
> Therefore we need to implement a custom Wait processor(let's just call it
> WaitBatch here) to hold all the other batches of FlowFiles while the
> business processors were handling the batch of FlowFiles whose creation
> time is earlier.
>
> In order to implement this, all the WaitBatch processors placed in the flow
> need to read/update records in a shared map so that each set of
> business-logic processors process one batch at a time.
> The entries are keyed using the batch number of the FlowFiles and the value
> of each entry is a batch release counter number which counts the number of
> times the batch of FlowFiles has passed through
> a WaitBatch processor.
> When a batch is released by WaitBatch, it will try to increment the batch
> number entry's value by 1 and then the released batch number and counter
> number will also be saved locally at the WaitBatch with StateManager;
> when the next batch reaches the WaitBatch, it will check if the counter
> value of the previous released batch number in the shared map is greater
> than the one saved locally, if the entry for the batch number does't
> exist(already removed) or the value in the shared map is greater, the next
> batch will be released and the local state and the entry on the shared map
> will be updated similarly.
> In the end of the flow, a custom processor will get the batch number from
> each batch and remove the entry from the shared map .
>
> So this implementation requires a shared map that could read/update
> frequently and atomically. I checked the Wait/Notify processors in NIFI and
> saw it is using the DistributedMapCacheClientService and
> DistributedMapCacheServer to sync status, so I'm wondering if I could use
> the DistributedMapCacheClientService to implement my logic. I also saw
> another implementation called RedisDistributedMapCacheClientService
> which seems to require Redis(I haven't used Redis).  Thanks in advance for
> any suggestions.
>
> Regards,
> Ben


Re: NIFI-4715 : ListS3 list duplicate files when incoming file throughput to S3 is high

2017-12-26 Thread Koji Kawamura
Hi Milan,

Thanks for your contribution! I reviewed the PR and posted a comment there.
Would you check that?

Koji

On Sat, Dec 23, 2017 at 7:15 AM, Milan Das  wrote:

> I have logged a defect in NIFI. ListS3 is generation duplicate flows  when
> S3 throughput is high.
>
>
>
> Root cause is:
> When the file gets uploaded to S3 simultaneously when List S3 is in
> progress.
> onTrigger--> maxTimestamp is initiated as 0L.
> This is clearing keys as per the code below
>
> When lastModifiedTime on S3 object is same as currentTimestamp for the
> listed key it should be skipped. As the key is cleared, it is loading the
> same file again.
> I think fix should be to initiate the maxTimestamp with currentTimestamp
> not 0L.
>
>
>
>
>
>
>
> https://issues.apache.org/jira/browse/ NIFI-4715
> 
>
>
>
> The fix I did already seems ok and working for us.
>
> long maxTimestamp = currentTimestamp;
>
>
>
> Wanted to check thought from other experts or of there is any other know
> fix .
>
>
>
>
>
> Regards,
>
>
>
> [image: graph]
>
> *Milan Das*
> Sr. System Architect
>
> email: m...@interset.com
> mobile: +1 678 216 5660 <(678)%20216-5660>
>
> [image: dIn icon] 
>
> www.interset.com
>
>
>
>
>


proper way in nifi to sync status between custom processors

2017-12-26 Thread 尹文才
Hi guys, I'm currently trying to find a proper way in nifi which could sync
status between my custom processors.
our requirement is like this, we're doing some ETL work using nifi and I'm
extracting the data from DB into batches of FlowFiles(each batch of
FlowFile has a flag FlowFile indicating the end of the batch).
There're some groups of custom processors downstream that need to process
these FlowFiles to do some business logic work. And we expect these
processors to process one batch of FlowFiles at a time.
Therefore we need to implement a custom Wait processor(let's just call it
WaitBatch here) to hold all the other batches of FlowFiles while the
business processors were handling the batch of FlowFiles whose creation
time is earlier.

In order to implement this, all the WaitBatch processors placed in the flow
need to read/update records in a shared map so that each set of
business-logic processors process one batch at a time.
The entries are keyed using the batch number of the FlowFiles and the value
of each entry is a batch release counter number which counts the number of
times the batch of FlowFiles has passed through
a WaitBatch processor.
When a batch is released by WaitBatch, it will try to increment the batch
number entry's value by 1 and then the released batch number and counter
number will also be saved locally at the WaitBatch with StateManager;
when the next batch reaches the WaitBatch, it will check if the counter
value of the previous released batch number in the shared map is greater
than the one saved locally, if the entry for the batch number does't
exist(already removed) or the value in the shared map is greater, the next
batch will be released and the local state and the entry on the shared map
will be updated similarly.
In the end of the flow, a custom processor will get the batch number from
each batch and remove the entry from the shared map .

So this implementation requires a shared map that could read/update
frequently and atomically. I checked the Wait/Notify processors in NIFI and
saw it is using the DistributedMapCacheClientService and
DistributedMapCacheServer to sync status, so I'm wondering if I could use
the DistributedMapCacheClientService to implement my logic. I also saw
another implementation called RedisDistributedMapCacheClientService
which seems to require Redis(I haven't used Redis).  Thanks in advance for
any suggestions.

Regards,
Ben