Hi Ben,

I was referring these two log messages in your previous email.
These two messages are both written by ExecuteSqlCommand, it does not
mean 'it was executed again'.

```
2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
```

As you written, the case where FlowFile repository fails checkpointing
will affect other processors to process same FlowFiles again. However
there won't be a simple solution to every processor to rollback its
job as different processors do different things. Creating a temp table
if not exist seems right approach to me.

At the same time, the route cause of getting FlowFile repository
failed should be investigated. Is it possible to share WaitBatch code?
The reason why ask this is all 'FlowFile Repository failed to update'
is related to WaitBatch processor in the log that you shared earlier.

Thanks,
Koji

On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
> Hi Koji, I will print the sql before actually executing it, but I checked
> the error log line you mentioned in your reply, this error was thrown by
> NiFi from within another processor called WaitBatch.
> I didn't find similar errors as the one from the ExecuteSqlCommand
> processor, I think it's because only the ExecuteSqlCommand is used to
> create temp database tables.
> You could check my ExecuteSqlCommand code via the link:
> https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
>
> If the error is really caused by FlowFile repository checkpoint failure and
> the flowfile was executed twice, I may have to create the temp table only
> if doesn't exist, I didn't fix this bug in this way
> right away is because I was afraid this fix could cover some other problems.
>
> Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> Hi Ben,
>>
>> The following two log messages are very close in terms of written
>> timestamp, but have different log level.
>> 2017-12-26 07:00:01,312 INFO
>> 2017-12-26 07:00:01,315 ERROR
>>
>> I guess those are logged within a single onTrigger of your
>> ExecuteSqlCommand custom processor, one is before executing, the other
>> is when it caught an exception. Just guessing as I don't have access
>> to the code.
>>
>> Does the same issue happen with other processors bundled with Apache
>> NiFi without your custom processor running?
>>
>> If NiFi fails to update/checkpoint FlowFile repository, then the same
>> FlowFile can be processed again after restarting NiFi.
>>
>> Thanks,
>> Koji
>>
>>
>>
>> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
>> > Thanks Koji, I will look into this article about the record model.
>> >
>> > By the way, that error I previously mentioned to you occurred again, I
>> > could see the sql query was executed twice in the log, this time I had
>> > turned on the verbose NiFi logging, the sql query is as below:
>> >
>> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
>> SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> _id;
>> >
>> > and it was executed again later:
>> >
>> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> > c.z.nifi.processors.ExecuteSqlCommand
>> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句失败:SELECT
>> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> > dbo.ods_extractDataDebug;
>> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
>> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
>> SQLServerException.java:217)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
>> SQLServerStatement.java:1655)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
>> SQLServerStatement.java:885)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
>> SQLServerStatement.java:778)
>> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
>> SQLServerConnection.java:2445)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
>> SQLServerStatement.java:191)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
>> SQLServerStatement.java:166)
>> > at
>> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
>> SQLServerStatement.java:751)
>> > at
>> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> DelegatingStatement.java:264)
>> > at
>> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> DelegatingStatement.java:264)
>> > at
>> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> executeSql(ExecuteSqlCommand.java:194)
>> > at
>> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> onTrigger(ExecuteSqlCommand.java:164)
>> > at
>> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>> AbstractProcessor.java:27)
>> > at
>> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>> StandardProcessorNode.java:1119)
>> > at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:147)
>> > at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:47)
>> > at
>> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
>> TimerDrivenSchedulingAgent.java:128)
>> > at java.util.concurrent.Executors$RunnableAdapter.
>> call(Executors.java:511)
>> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> > at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> > at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> > at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> > at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> > at java.lang.Thread.run(Thread.java:745)
>> >
>> > I also saw a lot of NiFi's exception like "ProcessException: FlowFile
>> > Repository failed to update", not sure if this is the reason the FlowFile
>> > got processed twice.  Could you help to take a look at my log file?
>> Thanks.
>> > You could get the log file via the link:
>> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view
>> >
>> > Best Regards,
>> > Ben
>> >
>> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >
>> >> Hi Ben,
>> >>
>> >> This blog post written by Mark, would be a good starting point to get
>> >> familiar with NiFi Record model.
>> >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>> >>
>> >> HA for DistributedMapCacheClientService and DistributedMapCacheServer
>> >> pair is not supported at the moment. If you need HighAvailability,
>> >> RedisDistributedMapCacheClientService with Redis replication will
>> >> provide that, I haven't tried that myself though.
>> >> https://redis.io/topics/replication
>> >>
>> >> Thanks,
>> >> Koji
>> >>
>> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> > Thanks for your quick response, Koji, I haven't heard and seen
>> anything
>> >> > about the NiFi record data model when I was reading the NiFi
>> >> > documentations,could you tell me where this model is documented?
>> Thanks.
>> >> >
>> >> > By the way, to my knowledge, when you need to use the
>> >> DistributedMapCacheServer
>> >> > from DistributedMapCacheClientService, you need to specify the host
>> url
>> >> for
>> >> > the server, this means inside a NiFi cluster
>> >> > when I specify the cache server and the node suddenly went down, I
>> >> couldn't
>> >> > possibly use it until the node goes up again right? Is there currently
>> >> such
>> >> > a cache server in NiFi that could support HA? Thanks.
>> >> >
>> >> > Regards,
>> >> > Ben
>> >> >
>> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >> >
>> >> >> Hi Ben,
>> >> >>
>> >> >> As you found from existing code, DistributedMapCache is used to share
>> >> >> state among different processors, and it can be used by your custom
>> >> >> processors, too.
>> >> >> However, I'd recommend to avoid such tight dependencies between
>> >> >> FlowFiles if possible, or minimize the part in flow that requires
>> that
>> >> >> constraint at least for better performance and simplicity.
>> >> >> For example, since a FlowFile can hold fairly large amount of data,
>> >> >> you could merge all FlowFiles in a single FlowFile, instead of
>> batches
>> >> >> of FlowFiles. If you need logical boundaries, you can use NiFi Record
>> >> >> data model to embed multiple records within a FlowFile, Record should
>> >> >> perform better.
>> >> >>
>> >> >> Hope this helps.
>> >> >>
>> >> >> Thanks,
>> >> >> Koji
>> >> >>
>> >> >>
>> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> >> > Hi guys, I'm currently trying to find a proper way in nifi which
>> could
>> >> >> sync
>> >> >> > status between my custom processors.
>> >> >> > our requirement is like this, we're doing some ETL work using nifi
>> and
>> >> >> I'm
>> >> >> > extracting the data from DB into batches of FlowFiles(each batch of
>> >> >> > FlowFile has a flag FlowFile indicating the end of the batch).
>> >> >> > There're some groups of custom processors downstream that need to
>> >> process
>> >> >> > these FlowFiles to do some business logic work. And we expect these
>> >> >> > processors to process one batch of FlowFiles at a time.
>> >> >> > Therefore we need to implement a custom Wait processor(let's just
>> >> call it
>> >> >> > WaitBatch here) to hold all the other batches of FlowFiles while
>> the
>> >> >> > business processors were handling the batch of FlowFiles whose
>> >> creation
>> >> >> > time is earlier.
>> >> >> >
>> >> >> > In order to implement this, all the WaitBatch processors placed in
>> the
>> >> >> flow
>> >> >> > need to read/update records in a shared map so that each set of
>> >> >> > business-logic processors process one batch at a time.
>> >> >> > The entries are keyed using the batch number of the FlowFiles and
>> the
>> >> >> value
>> >> >> > of each entry is a batch release counter number which counts the
>> >> number
>> >> >> of
>> >> >> > times the batch of FlowFiles has passed through
>> >> >> > a WaitBatch processor.
>> >> >> > When a batch is released by WaitBatch, it will try to increment the
>> >> batch
>> >> >> > number entry's value by 1 and then the released batch number and
>> >> counter
>> >> >> > number will also be saved locally at the WaitBatch with
>> StateManager;
>> >> >> > when the next batch reaches the WaitBatch, it will check if the
>> >> counter
>> >> >> > value of the previous released batch number in the shared map is
>> >> greater
>> >> >> > than the one saved locally, if the entry for the batch number
>> does't
>> >> >> > exist(already removed) or the value in the shared map is greater,
>> the
>> >> >> next
>> >> >> > batch will be released and the local state and the entry on the
>> shared
>> >> >> map
>> >> >> > will be updated similarly.
>> >> >> > In the end of the flow, a custom processor will get the batch
>> number
>> >> from
>> >> >> > each batch and remove the entry from the shared map .
>> >> >> >
>> >> >> > So this implementation requires a shared map that could read/update
>> >> >> > frequently and atomically. I checked the Wait/Notify processors in
>> >> NIFI
>> >> >> and
>> >> >> > saw it is using the DistributedMapCacheClientService and
>> >> >> > DistributedMapCacheServer to sync status, so I'm wondering if I
>> could
>> >> use
>> >> >> > the DistributedMapCacheClientService to implement my logic. I also
>> >> saw
>> >> >> > another implementation called RedisDistributedMapCacheClient
>> Service
>> >> >> > which seems to require Redis(I haven't used Redis).  Thanks in
>> advance
>> >> >> for
>> >> >> > any suggestions.
>> >> >> >
>> >> >> > Regards,
>> >> >> > Ben
>> >> >>
>> >>
>>

Reply via email to