Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
the sql query if the connection is lost(connection could be unstable), my
idea is to only transfer the FlowFile to the success relationship
after successfully executing the sql query. You could see the do while loop
in the code, the transaction will be rollbacked if the execution failed; if
the connection is lost, it will retry to execute the sql.
Will this logic cause my sql to be executed twice?

For the WaitBatch processor, I will take your approach to test individually
to see if the WaitBatch processor could cause the FlowFile repository
checkpointing failure.

Regards,
Ben

2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> Excuse me, I'm trying, but probably I don't fully understand what you
> want to achieve with the flow.
>
> It looks weird that WaitBatch is failing with such FlowFile repository
> error, while other processor such as ReplaceText succeeds.
> I recommend to test WaitBatch alone first without combining the
> database related processors, by feeding a test FlowFile having
> expected FlowFile attributes.
> Such input FlowFiles can be created by GenerateFlowFile processor.
> If the same error happens with only WaitBatch processor, then it
> should be easier to debug.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com>
> wrote:
> > Hi Ben,
> >
> > The one thing that looks strange in the screenshot is the
> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> > Those should be transferred to 'failure' relationship.
> >
> > Following executeSql() method, shouldn't it re-throw the caught
> exception?
> >
> >
> >             try (Connection con = dbcpService.getConnection()) {
> >                 logger.debug("设置autoCommit为false");
> >                 con.setAutoCommit(false);
> >
> >                 try (Statement stmt = con.createStatement()) {
> >                     logger.info("执行sql语句: {}", new Object[]{sql});
> >                     stmt.execute(sql);
> >
> >                     // 所有sql语句执行在一个transaction内
> >                     logger.debug("提交transaction");
> >                     con.commit();
> >                 } catch (Exception ex) {
> >                     logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
> >                     con.rollback();
> >                     //将exception抛到外层处理
> >                     throw ex;
> >                 } finally {
> >                     logger.debug("重新设置autoCommit为true");
> >                     con.setAutoCommit(true);
> >                 }
> >             } catch (Exception ex) {
> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
> > the incoming connection.
> >                 logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> >                 retryOnFail = true;
> >             }
> >
> > Thanks,
> > Koji
> >
> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
> >> Hi Koji, no problem. You could check the code of processor WaitBatch at
> the
> >> link:
> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
> >>
> >> I also uploaded a snapshot of part of NiFi flow which includes the
> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
> link:
> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
> >>
> >> You mentioned above that FlowFile repository fails checkpointing will
> >> affect other processors to process same FlowFile again, but as you could
> >> see from my snapshot image, the ExecuteSqlCommand is the second
> processor
> >> and before the WaitBatch processor, even if the FlowFile repository
> >> checkpointing failure is caused by WaitBatch, could it lead to the
> >> processors before it to process a FlowFile multiple times? Thanks.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >>
> >>> Hi Ben,
> >>>
> >>> I was referring these two log messages in your previous email.
> >>> These two messages are both written by ExecuteSqlCommand, it does not
> >>> mean 'it was executed again'.
> >>>
> >>> ```
> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >>> c.z.nifi.processors.ExecuteSqlCommand
> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >>> dbo.ods_extractDataDebug;
> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> column
> >>> _id;
> >>>
> >>> and it was executed again later:
> >>>
> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >>> c.z.nifi.processors.ExecuteSqlCommand
> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >>> 执行sql语句失败:SELECT
> >>> ```
> >>>
> >>> As you written, the case where FlowFile repository fails checkpointing
> >>> will affect other processors to process same FlowFiles again. However
> >>> there won't be a simple solution to every processor to rollback its
> >>> job as different processors do different things. Creating a temp table
> >>> if not exist seems right approach to me.
> >>>
> >>> At the same time, the route cause of getting FlowFile repository
> >>> failed should be investigated. Is it possible to share WaitBatch code?
> >>> The reason why ask this is all 'FlowFile Repository failed to update'
> >>> is related to WaitBatch processor in the log that you shared earlier.
> >>>
> >>> Thanks,
> >>> Koji
> >>>
> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
> >>> > Hi Koji, I will print the sql before actually executing it, but I
> checked
> >>> > the error log line you mentioned in your reply, this error was
> thrown by
> >>> > NiFi from within another processor called WaitBatch.
> >>> > I didn't find similar errors as the one from the ExecuteSqlCommand
> >>> > processor, I think it's because only the ExecuteSqlCommand is used to
> >>> > create temp database tables.
> >>> > You could check my ExecuteSqlCommand code via the link:
> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
> >>> >
> >>> > If the error is really caused by FlowFile repository checkpoint
> failure
> >>> and
> >>> > the flowfile was executed twice, I may have to create the temp table
> only
> >>> > if doesn't exist, I didn't fix this bug in this way
> >>> > right away is because I was afraid this fix could cover some other
> >>> problems.
> >>> >
> >>> > Thanks.
> >>> >
> >>> > Regards,
> >>> > Ben
> >>> >
> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >>> >
> >>> >> Hi Ben,
> >>> >>
> >>> >> The following two log messages are very close in terms of written
> >>> >> timestamp, but have different log level.
> >>> >> 2017-12-26 07:00:01,312 INFO
> >>> >> 2017-12-26 07:00:01,315 ERROR
> >>> >>
> >>> >> I guess those are logged within a single onTrigger of your
> >>> >> ExecuteSqlCommand custom processor, one is before executing, the
> other
> >>> >> is when it caught an exception. Just guessing as I don't have access
> >>> >> to the code.
> >>> >>
> >>> >> Does the same issue happen with other processors bundled with Apache
> >>> >> NiFi without your custom processor running?
> >>> >>
> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the
> same
> >>> >> FlowFile can be processed again after restarting NiFi.
> >>> >>
> >>> >> Thanks,
> >>> >> Koji
> >>> >>
> >>> >>
> >>> >>
> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
> >>> >> > Thanks Koji, I will look into this article about the record model.
> >>> >> >
> >>> >> > By the way, that error I previously mentioned to you occurred
> again, I
> >>> >> > could see the sql query was executed twice in the log, this time
> I had
> >>> >> > turned on the verbose NiFi logging, the sql query is as below:
> >>> >> >
> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句:
> >>> >> SELECT
> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >>> >> > dbo.ods_extractDataDebug;
> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> >>> column
> >>> >> _id;
> >>> >> >
> >>> >> > and it was executed again later:
> >>> >> >
> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >>> >> 执行sql语句失败:SELECT
> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >>> >> > dbo.ods_extractDataDebug;
> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> >>> column
> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
> >>> makeFromDatabaseError(
> >>> >> SQLServerException.java:217)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> >>> >> SQLServerStatement.java:1655)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
> doExecuteStatement(
> >>> >> SQLServerStatement.java:885)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
> >>> StmtExecCmd.doExecute(
> >>> >> SQLServerStatement.java:778)
> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
> >>> java:7505)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> >>> >> SQLServerConnection.java:2445)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> >>> >> SQLServerStatement.java:191)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> >>> >> SQLServerStatement.java:166)
> >>> >> > at
> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> >>> >> SQLServerStatement.java:751)
> >>> >> > at
> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >>> >> DelegatingStatement.java:264)
> >>> >> > at
> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >>> >> DelegatingStatement.java:264)
> >>> >> > at
> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >>> >> executeSql(ExecuteSqlCommand.java:194)
> >>> >> > at
> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >>> >> onTrigger(ExecuteSqlCommand.java:164)
> >>> >> > at
> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> >>> >> AbstractProcessor.java:27)
> >>> >> > at
> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> >>> >> StandardProcessorNode.java:1119)
> >>> >> > at
> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> >>> >> ContinuallyRunProcessorTask.java:147)
> >>> >> > at
> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> >>> >> ContinuallyRunProcessorTask.java:47)
> >>> >> > at
> >>> >> > org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.
> >>> run(
> >>> >> TimerDrivenSchedulingAgent.java:128)
> >>> >> > at java.util.concurrent.Executors$RunnableAdapter.
> >>> >> call(Executors.java:511)
> >>> >> > at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> >>> >> > at
> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
> >>> >> > at
> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >>> >> > at
> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >>> >> ThreadPoolExecutor.java:1142)
> >>> >> > at
> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >>> >> ThreadPoolExecutor.java:617)
> >>> >> > at java.lang.Thread.run(Thread.java:745)
> >>> >> >
> >>> >> > I also saw a lot of NiFi's exception like "ProcessException:
> FlowFile
> >>> >> > Repository failed to update", not sure if this is the reason the
> >>> FlowFile
> >>> >> > got processed twice.  Could you help to take a look at my log
> file?
> >>> >> Thanks.
> >>> >> > You could get the log file via the link:
> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
> >>> N9Xu6zMEi3/view
> >>> >> >
> >>> >> > Best Regards,
> >>> >> > Ben
> >>> >> >
> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com
> >:
> >>> >> >
> >>> >> >> Hi Ben,
> >>> >> >>
> >>> >> >> This blog post written by Mark, would be a good starting point
> to get
> >>> >> >> familiar with NiFi Record model.
> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-
> with-nifi
> >>> >> >>
> >>> >> >> HA for DistributedMapCacheClientService and
> >>> DistributedMapCacheServer
> >>> >> >> pair is not supported at the moment. If you need
> HighAvailability,
> >>> >> >> RedisDistributedMapCacheClientService with Redis replication
> will
> >>> >> >> provide that, I haven't tried that myself though.
> >>> >> >> https://redis.io/topics/replication
> >>> >> >>
> >>> >> >> Thanks,
> >>> >> >> Koji
> >>> >> >>
> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com>
> wrote:
> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and seen
> >>> >> anything
> >>> >> >> > about the NiFi record data model when I was reading the NiFi
> >>> >> >> > documentations,could you tell me where this model is
> documented?
> >>> >> Thanks.
> >>> >> >> >
> >>> >> >> > By the way, to my knowledge, when you need to use the
> >>> >> >> DistributedMapCacheServer
> >>> >> >> > from DistributedMapCacheClientService, you need to specify the
> >>> host
> >>> >> url
> >>> >> >> for
> >>> >> >> > the server, this means inside a NiFi cluster
> >>> >> >> > when I specify the cache server and the node suddenly went
> down, I
> >>> >> >> couldn't
> >>> >> >> > possibly use it until the node goes up again right? Is there
> >>> currently
> >>> >> >> such
> >>> >> >> > a cache server in NiFi that could support HA? Thanks.
> >>> >> >> >
> >>> >> >> > Regards,
> >>> >> >> > Ben
> >>> >> >> >
> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <
> ijokaruma...@gmail.com>:
> >>> >> >> >
> >>> >> >> >> Hi Ben,
> >>> >> >> >>
> >>> >> >> >> As you found from existing code, DistributedMapCache is used
> to
> >>> share
> >>> >> >> >> state among different processors, and it can be used by your
> >>> custom
> >>> >> >> >> processors, too.
> >>> >> >> >> However, I'd recommend to avoid such tight dependencies
> between
> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that
> requires
> >>> >> that
> >>> >> >> >> constraint at least for better performance and simplicity.
> >>> >> >> >> For example, since a FlowFile can hold fairly large amount of
> >>> data,
> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, instead of
> >>> >> batches
> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use NiFi
> >>> Record
> >>> >> >> >> data model to embed multiple records within a FlowFile, Record
> >>> should
> >>> >> >> >> perform better.
> >>> >> >> >>
> >>> >> >> >> Hope this helps.
> >>> >> >> >>
> >>> >> >> >> Thanks,
> >>> >> >> >> Koji
> >>> >> >> >>
> >>> >> >> >>
> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com>
> wrote:
> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in nifi
> which
> >>> >> could
> >>> >> >> >> sync
> >>> >> >> >> > status between my custom processors.
> >>> >> >> >> > our requirement is like this, we're doing some ETL work
> using
> >>> nifi
> >>> >> and
> >>> >> >> >> I'm
> >>> >> >> >> > extracting the data from DB into batches of FlowFiles(each
> >>> batch of
> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the
> batch).
> >>> >> >> >> > There're some groups of custom processors downstream that
> need
> >>> to
> >>> >> >> process
> >>> >> >> >> > these FlowFiles to do some business logic work. And we
> expect
> >>> these
> >>> >> >> >> > processors to process one batch of FlowFiles at a time.
> >>> >> >> >> > Therefore we need to implement a custom Wait processor(let's
> >>> just
> >>> >> >> call it
> >>> >> >> >> > WaitBatch here) to hold all the other batches of FlowFiles
> while
> >>> >> the
> >>> >> >> >> > business processors were handling the batch of FlowFiles
> whose
> >>> >> >> creation
> >>> >> >> >> > time is earlier.
> >>> >> >> >> >
> >>> >> >> >> > In order to implement this, all the WaitBatch processors
> placed
> >>> in
> >>> >> the
> >>> >> >> >> flow
> >>> >> >> >> > need to read/update records in a shared map so that each
> set of
> >>> >> >> >> > business-logic processors process one batch at a time.
> >>> >> >> >> > The entries are keyed using the batch number of the
> FlowFiles
> >>> and
> >>> >> the
> >>> >> >> >> value
> >>> >> >> >> > of each entry is a batch release counter number which
> counts the
> >>> >> >> number
> >>> >> >> >> of
> >>> >> >> >> > times the batch of FlowFiles has passed through
> >>> >> >> >> > a WaitBatch processor.
> >>> >> >> >> > When a batch is released by WaitBatch, it will try to
> increment
> >>> the
> >>> >> >> batch
> >>> >> >> >> > number entry's value by 1 and then the released batch
> number and
> >>> >> >> counter
> >>> >> >> >> > number will also be saved locally at the WaitBatch with
> >>> >> StateManager;
> >>> >> >> >> > when the next batch reaches the WaitBatch, it will check if
> the
> >>> >> >> counter
> >>> >> >> >> > value of the previous released batch number in the shared
> map is
> >>> >> >> greater
> >>> >> >> >> > than the one saved locally, if the entry for the batch
> number
> >>> >> does't
> >>> >> >> >> > exist(already removed) or the value in the shared map is
> >>> greater,
> >>> >> the
> >>> >> >> >> next
> >>> >> >> >> > batch will be released and the local state and the entry on
> the
> >>> >> shared
> >>> >> >> >> map
> >>> >> >> >> > will be updated similarly.
> >>> >> >> >> > In the end of the flow, a custom processor will get the
> batch
> >>> >> number
> >>> >> >> from
> >>> >> >> >> > each batch and remove the entry from the shared map .
> >>> >> >> >> >
> >>> >> >> >> > So this implementation requires a shared map that could
> >>> read/update
> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify
> processors
> >>> in
> >>> >> >> NIFI
> >>> >> >> >> and
> >>> >> >> >> > saw it is using the DistributedMapCacheClientService and
> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm wondering
> if I
> >>> >> could
> >>> >> >> use
> >>> >> >> >> > the DistributedMapCacheClientService to implement my
> logic. I
> >>> also
> >>> >> >> saw
> >>> >> >> >> > another implementation called RedisDistributedMapCacheClient
> >>> >> Service
> >>> >> >> >> > which seems to require Redis(I haven't used Redis).  Thanks
> in
> >>> >> advance
> >>> >> >> >> for
> >>> >> >> >> > any suggestions.
> >>> >> >> >> >
> >>> >> >> >> > Regards,
> >>> >> >> >> > Ben
> >>> >> >> >>
> >>> >> >>
> >>> >>
> >>>
>

Reply via email to