Hi Koji, no problem. You could check the code of processor WaitBatch at the
link:
https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ

I also uploaded a snapshot of part of NiFi flow which includes the
ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view

You mentioned above that FlowFile repository fails checkpointing will
affect other processors to process same FlowFile again, but as you could
see from my snapshot image, the ExecuteSqlCommand is the second processor
and before the WaitBatch processor, even if the FlowFile repository
checkpointing failure is caused by WaitBatch, could it lead to the
processors before it to process a FlowFile multiple times? Thanks.

Regards,
Ben

2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> I was referring these two log messages in your previous email.
> These two messages are both written by ExecuteSqlCommand, it does not
> mean 'it was executed again'.
>
> ```
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> ```
>
> As you written, the case where FlowFile repository fails checkpointing
> will affect other processors to process same FlowFiles again. However
> there won't be a simple solution to every processor to rollback its
> job as different processors do different things. Creating a temp table
> if not exist seems right approach to me.
>
> At the same time, the route cause of getting FlowFile repository
> failed should be investigated. Is it possible to share WaitBatch code?
> The reason why ask this is all 'FlowFile Repository failed to update'
> is related to WaitBatch processor in the log that you shared earlier.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi Koji, I will print the sql before actually executing it, but I checked
> > the error log line you mentioned in your reply, this error was thrown by
> > NiFi from within another processor called WaitBatch.
> > I didn't find similar errors as the one from the ExecuteSqlCommand
> > processor, I think it's because only the ExecuteSqlCommand is used to
> > create temp database tables.
> > You could check my ExecuteSqlCommand code via the link:
> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
> >
> > If the error is really caused by FlowFile repository checkpoint failure
> and
> > the flowfile was executed twice, I may have to create the temp table only
> > if doesn't exist, I didn't fix this bug in this way
> > right away is because I was afraid this fix could cover some other
> problems.
> >
> > Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> The following two log messages are very close in terms of written
> >> timestamp, but have different log level.
> >> 2017-12-26 07:00:01,312 INFO
> >> 2017-12-26 07:00:01,315 ERROR
> >>
> >> I guess those are logged within a single onTrigger of your
> >> ExecuteSqlCommand custom processor, one is before executing, the other
> >> is when it caught an exception. Just guessing as I don't have access
> >> to the code.
> >>
> >> Does the same issue happen with other processors bundled with Apache
> >> NiFi without your custom processor running?
> >>
> >> If NiFi fails to update/checkpoint FlowFile repository, then the same
> >> FlowFile can be processed again after restarting NiFi.
> >>
> >> Thanks,
> >> Koji
> >>
> >>
> >>
> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
> >> > Thanks Koji, I will look into this article about the record model.
> >> >
> >> > By the way, that error I previously mentioned to you occurred again, I
> >> > could see the sql query was executed twice in the log, this time I had
> >> > turned on the verbose NiFi logging, the sql query is as below:
> >> >
> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> >> SELECT
> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >> > dbo.ods_extractDataDebug;
> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >> _id;
> >> >
> >> > and it was executed again later:
> >> >
> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >> 执行sql语句失败:SELECT
> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >> > dbo.ods_extractDataDebug;
> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
> makeFromDatabaseError(
> >> SQLServerException.java:217)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> >> SQLServerStatement.java:1655)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
> >> SQLServerStatement.java:885)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
> StmtExecCmd.doExecute(
> >> SQLServerStatement.java:778)
> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
> java:7505)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> >> SQLServerConnection.java:2445)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> >> SQLServerStatement.java:191)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> >> SQLServerStatement.java:166)
> >> > at
> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> >> SQLServerStatement.java:751)
> >> > at
> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >> DelegatingStatement.java:264)
> >> > at
> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >> DelegatingStatement.java:264)
> >> > at
> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >> executeSql(ExecuteSqlCommand.java:194)
> >> > at
> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >> onTrigger(ExecuteSqlCommand.java:164)
> >> > at
> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> >> AbstractProcessor.java:27)
> >> > at
> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> >> StandardProcessorNode.java:1119)
> >> > at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:147)
> >> > at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:47)
> >> > at
> >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(
> >> TimerDrivenSchedulingAgent.java:128)
> >> > at java.util.concurrent.Executors$RunnableAdapter.
> >> call(Executors.java:511)
> >> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> > at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> > at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> > at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1142)
> >> > at
> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >> > at java.lang.Thread.run(Thread.java:745)
> >> >
> >> > I also saw a lot of NiFi's exception like "ProcessException: FlowFile
> >> > Repository failed to update", not sure if this is the reason the
> FlowFile
> >> > got processed twice.  Could you help to take a look at my log file?
> >> Thanks.
> >> > You could get the log file via the link:
> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
> N9Xu6zMEi3/view
> >> >
> >> > Best Regards,
> >> > Ben
> >> >
> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >> >
> >> >> Hi Ben,
> >> >>
> >> >> This blog post written by Mark, would be a good starting point to get
> >> >> familiar with NiFi Record model.
> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
> >> >>
> >> >> HA for DistributedMapCacheClientService and
> DistributedMapCacheServer
> >> >> pair is not supported at the moment. If you need HighAvailability,
> >> >> RedisDistributedMapCacheClientService with Redis replication will
> >> >> provide that, I haven't tried that myself though.
> >> >> https://redis.io/topics/replication
> >> >>
> >> >> Thanks,
> >> >> Koji
> >> >>
> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote:
> >> >> > Thanks for your quick response, Koji, I haven't heard and seen
> >> anything
> >> >> > about the NiFi record data model when I was reading the NiFi
> >> >> > documentations,could you tell me where this model is documented?
> >> Thanks.
> >> >> >
> >> >> > By the way, to my knowledge, when you need to use the
> >> >> DistributedMapCacheServer
> >> >> > from DistributedMapCacheClientService, you need to specify the
> host
> >> url
> >> >> for
> >> >> > the server, this means inside a NiFi cluster
> >> >> > when I specify the cache server and the node suddenly went down, I
> >> >> couldn't
> >> >> > possibly use it until the node goes up again right? Is there
> currently
> >> >> such
> >> >> > a cache server in NiFi that could support HA? Thanks.
> >> >> >
> >> >> > Regards,
> >> >> > Ben
> >> >> >
> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >> >> >
> >> >> >> Hi Ben,
> >> >> >>
> >> >> >> As you found from existing code, DistributedMapCache is used to
> share
> >> >> >> state among different processors, and it can be used by your
> custom
> >> >> >> processors, too.
> >> >> >> However, I'd recommend to avoid such tight dependencies between
> >> >> >> FlowFiles if possible, or minimize the part in flow that requires
> >> that
> >> >> >> constraint at least for better performance and simplicity.
> >> >> >> For example, since a FlowFile can hold fairly large amount of
> data,
> >> >> >> you could merge all FlowFiles in a single FlowFile, instead of
> >> batches
> >> >> >> of FlowFiles. If you need logical boundaries, you can use NiFi
> Record
> >> >> >> data model to embed multiple records within a FlowFile, Record
> should
> >> >> >> perform better.
> >> >> >>
> >> >> >> Hope this helps.
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Koji
> >> >> >>
> >> >> >>
> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
> >> >> >> > Hi guys, I'm currently trying to find a proper way in nifi which
> >> could
> >> >> >> sync
> >> >> >> > status between my custom processors.
> >> >> >> > our requirement is like this, we're doing some ETL work using
> nifi
> >> and
> >> >> >> I'm
> >> >> >> > extracting the data from DB into batches of FlowFiles(each
> batch of
> >> >> >> > FlowFile has a flag FlowFile indicating the end of the batch).
> >> >> >> > There're some groups of custom processors downstream that need
> to
> >> >> process
> >> >> >> > these FlowFiles to do some business logic work. And we expect
> these
> >> >> >> > processors to process one batch of FlowFiles at a time.
> >> >> >> > Therefore we need to implement a custom Wait processor(let's
> just
> >> >> call it
> >> >> >> > WaitBatch here) to hold all the other batches of FlowFiles while
> >> the
> >> >> >> > business processors were handling the batch of FlowFiles whose
> >> >> creation
> >> >> >> > time is earlier.
> >> >> >> >
> >> >> >> > In order to implement this, all the WaitBatch processors placed
> in
> >> the
> >> >> >> flow
> >> >> >> > need to read/update records in a shared map so that each set of
> >> >> >> > business-logic processors process one batch at a time.
> >> >> >> > The entries are keyed using the batch number of the FlowFiles
> and
> >> the
> >> >> >> value
> >> >> >> > of each entry is a batch release counter number which counts the
> >> >> number
> >> >> >> of
> >> >> >> > times the batch of FlowFiles has passed through
> >> >> >> > a WaitBatch processor.
> >> >> >> > When a batch is released by WaitBatch, it will try to increment
> the
> >> >> batch
> >> >> >> > number entry's value by 1 and then the released batch number and
> >> >> counter
> >> >> >> > number will also be saved locally at the WaitBatch with
> >> StateManager;
> >> >> >> > when the next batch reaches the WaitBatch, it will check if the
> >> >> counter
> >> >> >> > value of the previous released batch number in the shared map is
> >> >> greater
> >> >> >> > than the one saved locally, if the entry for the batch number
> >> does't
> >> >> >> > exist(already removed) or the value in the shared map is
> greater,
> >> the
> >> >> >> next
> >> >> >> > batch will be released and the local state and the entry on the
> >> shared
> >> >> >> map
> >> >> >> > will be updated similarly.
> >> >> >> > In the end of the flow, a custom processor will get the batch
> >> number
> >> >> from
> >> >> >> > each batch and remove the entry from the shared map .
> >> >> >> >
> >> >> >> > So this implementation requires a shared map that could
> read/update
> >> >> >> > frequently and atomically. I checked the Wait/Notify processors
> in
> >> >> NIFI
> >> >> >> and
> >> >> >> > saw it is using the DistributedMapCacheClientService and
> >> >> >> > DistributedMapCacheServer to sync status, so I'm wondering if I
> >> could
> >> >> use
> >> >> >> > the DistributedMapCacheClientService to implement my logic. I
> also
> >> >> saw
> >> >> >> > another implementation called RedisDistributedMapCacheClient
> >> Service
> >> >> >> > which seems to require Redis(I haven't used Redis).  Thanks in
> >> advance
> >> >> >> for
> >> >> >> > any suggestions.
> >> >> >> >
> >> >> >> > Regards,
> >> >> >> > Ben
> >> >> >>
> >> >>
> >>
>

Reply via email to