Hi Koji, thanks, the names of the temp tables are created with format
"yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second
part is a random number with length of 4.
So I think it's not possible to have 2 duplicate table names, the only
possibly I could think is the flowfile is passed into the processor twice.

About the provenance, I had updated to use the
WriteAheadProvenanceRepository implementation, but when I tried to check
the data provenance, it showed me the following exception message:
HTTP ERROR 500

Problem accessing /nifi/provenance. Reason:

    Server Error

Caused by:

javax.servlet.ServletException:
org.eclipse.jetty.servlet.ServletHolder$1:
java.lang.NullPointerException
        at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
        at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:564)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
        at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
java.lang.NullPointerException
        at 
org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
        at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
        at 
org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
        at 
org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
        at 
org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at 
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
        at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:564)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
        at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
        at 
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
        at 
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
        ... 3 more
Caused by: java.lang.NullPointerException
        at 
org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
        at 
org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
        at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
        at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
        ... 36 more

Caused by:

org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException
        at 
org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
        at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
        at 
org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
        at 
org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
        at 
org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at 
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
        at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:564)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
        at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
        at 
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
        at 
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at 
org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
        at 
org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
        at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
        at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
        ... 36 more

Caused by:

java.lang.NullPointerException
        at 
org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
        at 
org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
        at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
        at 
org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
        at 
org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
        at 
org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
        at 
org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
        at 
org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
        at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:564)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
        at 
org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
        at 
org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
        at 
org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
        at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
        at java.lang.Thread.run(Thread.java:745)

My configuration inside nifi.properties is as below:
# Provenance Repository Properties
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
nifi.provenance.repository.debug.frequency=1_000_000
nifi.provenance.repository.encryption.key.provider.implementation=
nifi.provenance.repository.encryption.key.provider.location=
nifi.provenance.repository.encryption.key.id=
nifi.provenance.repository.encryption.key=

# Persistent Provenance Repository Properties
nifi.provenance.repository.directory.default=../provenance_repository
nifi.provenance.repository.max.storage.time=24 hours
nifi.provenance.repository.max.storage.size=1 GB
nifi.provenance.repository.rollover.time=30 secs
nifi.provenance.repository.rollover.size=100 MB
nifi.provenance.repository.query.threads=2
nifi.provenance.repository.index.threads=1
nifi.provenance.repository.compress.on.rollover=true
nifi.provenance.repository.always.sync=false
nifi.provenance.repository.index.shard.size=4 GB


By the way, does this Data Provenance list all FlowFiles ever created or
only part of it? Should I try to find the FlowFile with the exception time
in the log? Thanks.

Regards,
Ben

2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> The ExecuteSqlCommand retry logic does not execute the same query
> multiple times if it succeeds.
> So, there must be input FlowFiles containing the same query had been
> passed more than once.
> It could be the same FlowFile, or different FlowFiles generated by the
> first processor for some reason.
> To investigate those kind of FlowFile level information, NiFi
> provenance data and FlowFile lineage will be very useful.
> https://nifi.apache.org/docs/nifi-docs/html/user-guide.
> html#viewing-flowfile-lineage
>
> I didn't mention about it earlier because you were having Provenance
> repository performance issue, but I hope you can use it now with the
> WriteAheadProvenanceRepository.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
> > the sql query if the connection is lost(connection could be unstable), my
> > idea is to only transfer the FlowFile to the success relationship
> > after successfully executing the sql query. You could see the do while
> loop
> > in the code, the transaction will be rollbacked if the execution failed;
> if
> > the connection is lost, it will retry to execute the sql.
> > Will this logic cause my sql to be executed twice?
> >
> > For the WaitBatch processor, I will take your approach to test
> individually
> > to see if the WaitBatch processor could cause the FlowFile repository
> > checkpointing failure.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> Excuse me, I'm trying, but probably I don't fully understand what you
> >> want to achieve with the flow.
> >>
> >> It looks weird that WaitBatch is failing with such FlowFile repository
> >> error, while other processor such as ReplaceText succeeds.
> >> I recommend to test WaitBatch alone first without combining the
> >> database related processors, by feeding a test FlowFile having
> >> expected FlowFile attributes.
> >> Such input FlowFiles can be created by GenerateFlowFile processor.
> >> If the same error happens with only WaitBatch processor, then it
> >> should be easier to debug.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com>
> >> wrote:
> >> > Hi Ben,
> >> >
> >> > The one thing that looks strange in the screenshot is the
> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> >> > Those should be transferred to 'failure' relationship.
> >> >
> >> > Following executeSql() method, shouldn't it re-throw the caught
> >> exception?
> >> >
> >> >
> >> >             try (Connection con = dbcpService.getConnection()) {
> >> >                 logger.debug("设置autoCommit为false");
> >> >                 con.setAutoCommit(false);
> >> >
> >> >                 try (Statement stmt = con.createStatement()) {
> >> >                     logger.info("执行sql语句: {}", new Object[]{sql});
> >> >                     stmt.execute(sql);
> >> >
> >> >                     // 所有sql语句执行在一个transaction内
> >> >                     logger.debug("提交transaction");
> >> >                     con.commit();
> >> >                 } catch (Exception ex) {
> >> >                     logger.error("执行sql语句失败:{}", new Object[]{sql,
> ex});
> >> >                     con.rollback();
> >> >                     //将exception抛到外层处理
> >> >                     throw ex;
> >> >                 } finally {
> >> >                     logger.debug("重新设置autoCommit为true");
> >> >                     con.setAutoCommit(true);
> >> >                 }
> >> >             } catch (Exception ex) {
> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
> >> > the incoming connection.
> >> >                 logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> >> >                 retryOnFail = true;
> >> >             }
> >> >
> >> > Thanks,
> >> > Koji
> >> >
> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
> >> >> Hi Koji, no problem. You could check the code of processor WaitBatch
> at
> >> the
> >> >> link:
> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
> >> >>
> >> >> I also uploaded a snapshot of part of NiFi flow which includes the
> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
> >> link:
> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-
> 3IVi2h/view
> >> >>
> >> >> You mentioned above that FlowFile repository fails checkpointing will
> >> >> affect other processors to process same FlowFile again, but as you
> could
> >> >> see from my snapshot image, the ExecuteSqlCommand is the second
> >> processor
> >> >> and before the WaitBatch processor, even if the FlowFile repository
> >> >> checkpointing failure is caused by WaitBatch, could it lead to the
> >> >> processors before it to process a FlowFile multiple times? Thanks.
> >> >>
> >> >> Regards,
> >> >> Ben
> >> >>
> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >> >>
> >> >>> Hi Ben,
> >> >>>
> >> >>> I was referring these two log messages in your previous email.
> >> >>> These two messages are both written by ExecuteSqlCommand, it does
> not
> >> >>> mean 'it was executed again'.
> >> >>>
> >> >>> ```
> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >> >>> c.z.nifi.processors.ExecuteSqlCommand
> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> >> SELECT
> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> >> >>> dbo.ods_extractDataDebug;
> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
> >> column
> >> >>> _id;
> >> >>>
> >> >>> and it was executed again later:
> >> >>>
> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >> >>> c.z.nifi.processors.ExecuteSqlCommand
> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >> >>> 执行sql语句失败:SELECT
> >> >>> ```
> >> >>>
> >> >>> As you written, the case where FlowFile repository fails
> checkpointing
> >> >>> will affect other processors to process same FlowFiles again.
> However
> >> >>> there won't be a simple solution to every processor to rollback its
> >> >>> job as different processors do different things. Creating a temp
> table
> >> >>> if not exist seems right approach to me.
> >> >>>
> >> >>> At the same time, the route cause of getting FlowFile repository
> >> >>> failed should be investigated. Is it possible to share WaitBatch
> code?
> >> >>> The reason why ask this is all 'FlowFile Repository failed to
> update'
> >> >>> is related to WaitBatch processor in the log that you shared
> earlier.
> >> >>>
> >> >>> Thanks,
> >> >>> Koji
> >> >>>
> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
> >> >>> > Hi Koji, I will print the sql before actually executing it, but I
> >> checked
> >> >>> > the error log line you mentioned in your reply, this error was
> >> thrown by
> >> >>> > NiFi from within another processor called WaitBatch.
> >> >>> > I didn't find similar errors as the one from the ExecuteSqlCommand
> >> >>> > processor, I think it's because only the ExecuteSqlCommand is
> used to
> >> >>> > create temp database tables.
> >> >>> > You could check my ExecuteSqlCommand code via the link:
> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk
> _5P
> >> >>> >
> >> >>> > If the error is really caused by FlowFile repository checkpoint
> >> failure
> >> >>> and
> >> >>> > the flowfile was executed twice, I may have to create the temp
> table
> >> only
> >> >>> > if doesn't exist, I didn't fix this bug in this way
> >> >>> > right away is because I was afraid this fix could cover some other
> >> >>> problems.
> >> >>> >
> >> >>> > Thanks.
> >> >>> >
> >> >>> > Regards,
> >> >>> > Ben
> >> >>> >
> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com
> >:
> >> >>> >
> >> >>> >> Hi Ben,
> >> >>> >>
> >> >>> >> The following two log messages are very close in terms of written
> >> >>> >> timestamp, but have different log level.
> >> >>> >> 2017-12-26 07:00:01,312 INFO
> >> >>> >> 2017-12-26 07:00:01,315 ERROR
> >> >>> >>
> >> >>> >> I guess those are logged within a single onTrigger of your
> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the
> >> other
> >> >>> >> is when it caught an exception. Just guessing as I don't have
> access
> >> >>> >> to the code.
> >> >>> >>
> >> >>> >> Does the same issue happen with other processors bundled with
> Apache
> >> >>> >> NiFi without your custom processor running?
> >> >>> >>
> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the
> >> same
> >> >>> >> FlowFile can be processed again after restarting NiFi.
> >> >>> >>
> >> >>> >> Thanks,
> >> >>> >> Koji
> >> >>> >>
> >> >>> >>
> >> >>> >>
> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com>
> wrote:
> >> >>> >> > Thanks Koji, I will look into this article about the record
> model.
> >> >>> >> >
> >> >>> >> > By the way, that error I previously mentioned to you occurred
> >> again, I
> >> >>> >> > could see the sql query was executed twice in the log, this
> time
> >> I had
> >> >>> >> > turned on the verbose NiFi logging, the sql query is as below:
> >> >>> >> >
> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >> 执行sql语句:
> >> >>> >> SELECT
> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
> FROM
> >> >>> >> > dbo.ods_extractDataDebug;
> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
> drop
> >> >>> column
> >> >>> >> _id;
> >> >>> >> >
> >> >>> >> > and it was executed again later:
> >> >>> >> >
> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> >> >>> >> 执行sql语句失败:SELECT
> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
> FROM
> >> >>> >> > dbo.ods_extractDataDebug;
> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
> drop
> >> >>> column
> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException:
> 数据库中已存在名为
> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
> >> >>> makeFromDatabaseError(
> >> >>> >> SQLServerException.java:217)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> >> >>> >> SQLServerStatement.java:1655)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
> >> doExecuteStatement(
> >> >>> >> SQLServerStatement.java:885)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
> >> >>> StmtExecCmd.doExecute(
> >> >>> >> SQLServerStatement.java:778)
> >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
> >> >>> java:7505)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.
> executeCommand(
> >> >>> >> SQLServerConnection.java:2445)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
> executeCommand(
> >> >>> >> SQLServerStatement.java:191)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
> executeStatement(
> >> >>> >> SQLServerStatement.java:166)
> >> >>> >> > at
> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> >> >>> >> SQLServerStatement.java:751)
> >> >>> >> > at
> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >> >>> >> DelegatingStatement.java:264)
> >> >>> >> > at
> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
> >> >>> >> DelegatingStatement.java:264)
> >> >>> >> > at
> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >> >>> >> executeSql(ExecuteSqlCommand.java:194)
> >> >>> >> > at
> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> >> >>> >> onTrigger(ExecuteSqlCommand.java:164)
> >> >>> >> > at
> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> >> >>> >> AbstractProcessor.java:27)
> >> >>> >> > at
> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> >> >>> >> StandardProcessorNode.java:1119)
> >> >>> >> > at
> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> >> call(
> >> >>> >> ContinuallyRunProcessorTask.java:147)
> >> >>> >> > at
> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> >> call(
> >> >>> >> ContinuallyRunProcessorTask.java:47)
> >> >>> >> > at
> >> >>> >> > org.apache.nifi.controller.scheduling.
> >> TimerDrivenSchedulingAgent$1.
> >> >>> run(
> >> >>> >> TimerDrivenSchedulingAgent.java:128)
> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter.
> >> >>> >> call(Executors.java:511)
> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset(
> >> FutureTask.java:308)
> >> >>> >> > at
> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> >> java:180)
> >> >>> >> > at
> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> >>> >> > at
> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> >>> >> ThreadPoolExecutor.java:1142)
> >> >>> >> > at
> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> >>> >> ThreadPoolExecutor.java:617)
> >> >>> >> > at java.lang.Thread.run(Thread.java:745)
> >> >>> >> >
> >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException:
> >> FlowFile
> >> >>> >> > Repository failed to update", not sure if this is the reason
> the
> >> >>> FlowFile
> >> >>> >> > got processed twice.  Could you help to take a look at my log
> >> file?
> >> >>> >> Thanks.
> >> >>> >> > You could get the log file via the link:
> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
> >> >>> N9Xu6zMEi3/view
> >> >>> >> >
> >> >>> >> > Best Regards,
> >> >>> >> > Ben
> >> >>> >> >
> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <
> ijokaruma...@gmail.com
> >> >:
> >> >>> >> >
> >> >>> >> >> Hi Ben,
> >> >>> >> >>
> >> >>> >> >> This blog post written by Mark, would be a good starting point
> >> to get
> >> >>> >> >> familiar with NiFi Record model.
> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-
> >> with-nifi
> >> >>> >> >>
> >> >>> >> >> HA for DistributedMapCacheClientService and
> >> >>> DistributedMapCacheServer
> >> >>> >> >> pair is not supported at the moment. If you need
> >> HighAvailability,
> >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication
> >> will
> >> >>> >> >> provide that, I haven't tried that myself though.
> >> >>> >> >> https://redis.io/topics/replication
> >> >>> >> >>
> >> >>> >> >> Thanks,
> >> >>> >> >> Koji
> >> >>> >> >>
> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com>
> >> wrote:
> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and
> seen
> >> >>> >> anything
> >> >>> >> >> > about the NiFi record data model when I was reading the NiFi
> >> >>> >> >> > documentations,could you tell me where this model is
> >> documented?
> >> >>> >> Thanks.
> >> >>> >> >> >
> >> >>> >> >> > By the way, to my knowledge, when you need to use the
> >> >>> >> >> DistributedMapCacheServer
> >> >>> >> >> > from DistributedMapCacheClientService, you need to specify
> the
> >> >>> host
> >> >>> >> url
> >> >>> >> >> for
> >> >>> >> >> > the server, this means inside a NiFi cluster
> >> >>> >> >> > when I specify the cache server and the node suddenly went
> >> down, I
> >> >>> >> >> couldn't
> >> >>> >> >> > possibly use it until the node goes up again right? Is there
> >> >>> currently
> >> >>> >> >> such
> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks.
> >> >>> >> >> >
> >> >>> >> >> > Regards,
> >> >>> >> >> > Ben
> >> >>> >> >> >
> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <
> >> ijokaruma...@gmail.com>:
> >> >>> >> >> >
> >> >>> >> >> >> Hi Ben,
> >> >>> >> >> >>
> >> >>> >> >> >> As you found from existing code, DistributedMapCache is
> used
> >> to
> >> >>> share
> >> >>> >> >> >> state among different processors, and it can be used by
> your
> >> >>> custom
> >> >>> >> >> >> processors, too.
> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies
> >> between
> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that
> >> requires
> >> >>> >> that
> >> >>> >> >> >> constraint at least for better performance and simplicity.
> >> >>> >> >> >> For example, since a FlowFile can hold fairly large amount
> of
> >> >>> data,
> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile,
> instead of
> >> >>> >> batches
> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use
> NiFi
> >> >>> Record
> >> >>> >> >> >> data model to embed multiple records within a FlowFile,
> Record
> >> >>> should
> >> >>> >> >> >> perform better.
> >> >>> >> >> >>
> >> >>> >> >> >> Hope this helps.
> >> >>> >> >> >>
> >> >>> >> >> >> Thanks,
> >> >>> >> >> >> Koji
> >> >>> >> >> >>
> >> >>> >> >> >>
> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com>
> >> wrote:
> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in
> nifi
> >> which
> >> >>> >> could
> >> >>> >> >> >> sync
> >> >>> >> >> >> > status between my custom processors.
> >> >>> >> >> >> > our requirement is like this, we're doing some ETL work
> >> using
> >> >>> nifi
> >> >>> >> and
> >> >>> >> >> >> I'm
> >> >>> >> >> >> > extracting the data from DB into batches of
> FlowFiles(each
> >> >>> batch of
> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the
> >> batch).
> >> >>> >> >> >> > There're some groups of custom processors downstream that
> >> need
> >> >>> to
> >> >>> >> >> process
> >> >>> >> >> >> > these FlowFiles to do some business logic work. And we
> >> expect
> >> >>> these
> >> >>> >> >> >> > processors to process one batch of FlowFiles at a time.
> >> >>> >> >> >> > Therefore we need to implement a custom Wait
> processor(let's
> >> >>> just
> >> >>> >> >> call it
> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of
> FlowFiles
> >> while
> >> >>> >> the
> >> >>> >> >> >> > business processors were handling the batch of FlowFiles
> >> whose
> >> >>> >> >> creation
> >> >>> >> >> >> > time is earlier.
> >> >>> >> >> >> >
> >> >>> >> >> >> > In order to implement this, all the WaitBatch processors
> >> placed
> >> >>> in
> >> >>> >> the
> >> >>> >> >> >> flow
> >> >>> >> >> >> > need to read/update records in a shared map so that each
> >> set of
> >> >>> >> >> >> > business-logic processors process one batch at a time.
> >> >>> >> >> >> > The entries are keyed using the batch number of the
> >> FlowFiles
> >> >>> and
> >> >>> >> the
> >> >>> >> >> >> value
> >> >>> >> >> >> > of each entry is a batch release counter number which
> >> counts the
> >> >>> >> >> number
> >> >>> >> >> >> of
> >> >>> >> >> >> > times the batch of FlowFiles has passed through
> >> >>> >> >> >> > a WaitBatch processor.
> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to
> >> increment
> >> >>> the
> >> >>> >> >> batch
> >> >>> >> >> >> > number entry's value by 1 and then the released batch
> >> number and
> >> >>> >> >> counter
> >> >>> >> >> >> > number will also be saved locally at the WaitBatch with
> >> >>> >> StateManager;
> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will check
> if
> >> the
> >> >>> >> >> counter
> >> >>> >> >> >> > value of the previous released batch number in the shared
> >> map is
> >> >>> >> >> greater
> >> >>> >> >> >> > than the one saved locally, if the entry for the batch
> >> number
> >> >>> >> does't
> >> >>> >> >> >> > exist(already removed) or the value in the shared map is
> >> >>> greater,
> >> >>> >> the
> >> >>> >> >> >> next
> >> >>> >> >> >> > batch will be released and the local state and the entry
> on
> >> the
> >> >>> >> shared
> >> >>> >> >> >> map
> >> >>> >> >> >> > will be updated similarly.
> >> >>> >> >> >> > In the end of the flow, a custom processor will get the
> >> batch
> >> >>> >> number
> >> >>> >> >> from
> >> >>> >> >> >> > each batch and remove the entry from the shared map .
> >> >>> >> >> >> >
> >> >>> >> >> >> > So this implementation requires a shared map that could
> >> >>> read/update
> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify
> >> processors
> >> >>> in
> >> >>> >> >> NIFI
> >> >>> >> >> >> and
> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService and
> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm
> wondering
> >> if I
> >> >>> >> could
> >> >>> >> >> use
> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my
> >> logic. I
> >> >>> also
> >> >>> >> >> saw
> >> >>> >> >> >> > another implementation called
> RedisDistributedMapCacheClient
> >> >>> >> Service
> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis).
> Thanks
> >> in
> >> >>> >> advance
> >> >>> >> >> >> for
> >> >>> >> >> >> > any suggestions.
> >> >>> >> >> >> >
> >> >>> >> >> >> > Regards,
> >> >>> >> >> >> > Ben
> >> >>> >> >> >>
> >> >>> >> >>
> >> >>> >>
> >> >>>
> >>
>

Reply via email to