Hi Koji, sorry about the provenance exception, it was because there's no
space left on the machine(filled up with logs)

Regards,
Ben

2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Koji, thanks, the names of the temp tables are created with format
> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second
> part is a random number with length of 4.
> So I think it's not possible to have 2 duplicate table names, the only
> possibly I could think is the flowfile is passed into the processor twice.
>
> About the provenance, I had updated to use the
> WriteAheadProvenanceRepository implementation, but when I tried to check
> the data provenance, it showed me the following exception message:
> HTTP ERROR 500
>
> Problem accessing /nifi/provenance. Reason:
>
>     Server Error
>
> Caused by:
>
> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>       at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
>       at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>       at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>       at 
> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>       at 
> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>       at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>       at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>       at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>       at 
> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>       ... 3 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>       at 
> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>       ... 36 more
>
> Caused by:
>
> org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException
>       at 
> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>       at 
> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>       at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>       at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>       at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>       at 
> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>       at 
> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>       ... 36 more
>
> Caused by:
>
> java.lang.NullPointerException
>       at 
> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>       at 
> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>       at 
> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>       at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>       at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>       at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>       at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>       at 
> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>       at 
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>       at java.lang.Thread.run(Thread.java:745)
>
> My configuration inside nifi.properties is as below:
> # Provenance Repository Properties
> nifi.provenance.repository.implementation=org.apache.nifi.provenance.
> WriteAheadProvenanceRepository
> nifi.provenance.repository.debug.frequency=1_000_000
> nifi.provenance.repository.encryption.key.provider.implementation=
> nifi.provenance.repository.encryption.key.provider.location=
> nifi.provenance.repository.encryption.key.id=
> nifi.provenance.repository.encryption.key=
>
> # Persistent Provenance Repository Properties
> nifi.provenance.repository.directory.default=../provenance_repository
> nifi.provenance.repository.max.storage.time=24 hours
> nifi.provenance.repository.max.storage.size=1 GB
> nifi.provenance.repository.rollover.time=30 secs
> nifi.provenance.repository.rollover.size=100 MB
> nifi.provenance.repository.query.threads=2
> nifi.provenance.repository.index.threads=1
> nifi.provenance.repository.compress.on.rollover=true
> nifi.provenance.repository.always.sync=false
> nifi.provenance.repository.index.shard.size=4 GB
>
>
> By the way, does this Data Provenance list all FlowFiles ever created or
> only part of it? Should I try to find the FlowFile with the exception time
> in the log? Thanks.
>
> Regards,
> Ben
>
> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> Hi Ben,
>>
>> The ExecuteSqlCommand retry logic does not execute the same query
>> multiple times if it succeeds.
>> So, there must be input FlowFiles containing the same query had been
>> passed more than once.
>> It could be the same FlowFile, or different FlowFiles generated by the
>> first processor for some reason.
>> To investigate those kind of FlowFile level information, NiFi
>> provenance data and FlowFile lineage will be very useful.
>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#
>> viewing-flowfile-lineage
>>
>> I didn't mention about it earlier because you were having Provenance
>> repository performance issue, but I hope you can use it now with the
>> WriteAheadProvenanceRepository.
>>
>> Thanks,
>> Koji
>>
>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote:
>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
>> > the sql query if the connection is lost(connection could be unstable),
>> my
>> > idea is to only transfer the FlowFile to the success relationship
>> > after successfully executing the sql query. You could see the do while
>> loop
>> > in the code, the transaction will be rollbacked if the execution
>> failed; if
>> > the connection is lost, it will retry to execute the sql.
>> > Will this logic cause my sql to be executed twice?
>> >
>> > For the WaitBatch processor, I will take your approach to test
>> individually
>> > to see if the WaitBatch processor could cause the FlowFile repository
>> > checkpointing failure.
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >
>> >> Hi Ben,
>> >>
>> >> Excuse me, I'm trying, but probably I don't fully understand what you
>> >> want to achieve with the flow.
>> >>
>> >> It looks weird that WaitBatch is failing with such FlowFile repository
>> >> error, while other processor such as ReplaceText succeeds.
>> >> I recommend to test WaitBatch alone first without combining the
>> >> database related processors, by feeding a test FlowFile having
>> >> expected FlowFile attributes.
>> >> Such input FlowFiles can be created by GenerateFlowFile processor.
>> >> If the same error happens with only WaitBatch processor, then it
>> >> should be easier to debug.
>> >>
>> >> Thanks,
>> >> Koji
>> >>
>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com
>> >
>> >> wrote:
>> >> > Hi Ben,
>> >> >
>> >> > The one thing that looks strange in the screenshot is the
>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
>> >> > Those should be transferred to 'failure' relationship.
>> >> >
>> >> > Following executeSql() method, shouldn't it re-throw the caught
>> >> exception?
>> >> >
>> >> >
>> >> >             try (Connection con = dbcpService.getConnection()) {
>> >> >                 logger.debug("设置autoCommit为false");
>> >> >                 con.setAutoCommit(false);
>> >> >
>> >> >                 try (Statement stmt = con.createStatement()) {
>> >> >                     logger.info("执行sql语句: {}", new Object[]{sql});
>> >> >                     stmt.execute(sql);
>> >> >
>> >> >                     // 所有sql语句执行在一个transaction内
>> >> >                     logger.debug("提交transaction");
>> >> >                     con.commit();
>> >> >                 } catch (Exception ex) {
>> >> >                     logger.error("执行sql语句失败:{}", new Object[]{sql,
>> ex});
>> >> >                     con.rollback();
>> >> >                     //将exception抛到外层处理
>> >> >                     throw ex;
>> >> >                 } finally {
>> >> >                     logger.debug("重新设置autoCommit为true");
>> >> >                     con.setAutoCommit(true);
>> >> >                 }
>> >> >             } catch (Exception ex) {
>> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
>> >> > the incoming connection.
>> >> >                 logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
>> >> >                 retryOnFail = true;
>> >> >             }
>> >> >
>> >> > Thanks,
>> >> > Koji
>> >> >
>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> >> Hi Koji, no problem. You could check the code of processor
>> WaitBatch at
>> >> the
>> >> >> link:
>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>> >> >>
>> >> >> I also uploaded a snapshot of part of NiFi flow which includes the
>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
>> >> link:
>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3
>> IVi2h/view
>> >> >>
>> >> >> You mentioned above that FlowFile repository fails checkpointing
>> will
>> >> >> affect other processors to process same FlowFile again, but as you
>> could
>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second
>> >> processor
>> >> >> and before the WaitBatch processor, even if the FlowFile repository
>> >> >> checkpointing failure is caused by WaitBatch, could it lead to the
>> >> >> processors before it to process a FlowFile multiple times? Thanks.
>> >> >>
>> >> >> Regards,
>> >> >> Ben
>> >> >>
>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >> >>
>> >> >>> Hi Ben,
>> >> >>>
>> >> >>> I was referring these two log messages in your previous email.
>> >> >>> These two messages are both written by ExecuteSqlCommand, it does
>> not
>> >> >>> mean 'it was executed again'.
>> >> >>>
>> >> >>> ```
>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> 执行sql语句:
>> >> SELECT
>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>> >> >>> dbo.ods_extractDataDebug;
>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
>> >> column
>> >> >>> _id;
>> >> >>>
>> >> >>> and it was executed again later:
>> >> >>>
>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >> >>> 执行sql语句失败:SELECT
>> >> >>> ```
>> >> >>>
>> >> >>> As you written, the case where FlowFile repository fails
>> checkpointing
>> >> >>> will affect other processors to process same FlowFiles again.
>> However
>> >> >>> there won't be a simple solution to every processor to rollback its
>> >> >>> job as different processors do different things. Creating a temp
>> table
>> >> >>> if not exist seems right approach to me.
>> >> >>>
>> >> >>> At the same time, the route cause of getting FlowFile repository
>> >> >>> failed should be investigated. Is it possible to share WaitBatch
>> code?
>> >> >>> The reason why ask this is all 'FlowFile Repository failed to
>> update'
>> >> >>> is related to WaitBatch processor in the log that you shared
>> earlier.
>> >> >>>
>> >> >>> Thanks,
>> >> >>> Koji
>> >> >>>
>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> >>> > Hi Koji, I will print the sql before actually executing it, but I
>> >> checked
>> >> >>> > the error log line you mentioned in your reply, this error was
>> >> thrown by
>> >> >>> > NiFi from within another processor called WaitBatch.
>> >> >>> > I didn't find similar errors as the one from the
>> ExecuteSqlCommand
>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is
>> used to
>> >> >>> > create temp database tables.
>> >> >>> > You could check my ExecuteSqlCommand code via the link:
>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh
>> jSk_5P
>> >> >>> >
>> >> >>> > If the error is really caused by FlowFile repository checkpoint
>> >> failure
>> >> >>> and
>> >> >>> > the flowfile was executed twice, I may have to create the temp
>> table
>> >> only
>> >> >>> > if doesn't exist, I didn't fix this bug in this way
>> >> >>> > right away is because I was afraid this fix could cover some
>> other
>> >> >>> problems.
>> >> >>> >
>> >> >>> > Thanks.
>> >> >>> >
>> >> >>> > Regards,
>> >> >>> > Ben
>> >> >>> >
>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com
>> >:
>> >> >>> >
>> >> >>> >> Hi Ben,
>> >> >>> >>
>> >> >>> >> The following two log messages are very close in terms of
>> written
>> >> >>> >> timestamp, but have different log level.
>> >> >>> >> 2017-12-26 07:00:01,312 INFO
>> >> >>> >> 2017-12-26 07:00:01,315 ERROR
>> >> >>> >>
>> >> >>> >> I guess those are logged within a single onTrigger of your
>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the
>> >> other
>> >> >>> >> is when it caught an exception. Just guessing as I don't have
>> access
>> >> >>> >> to the code.
>> >> >>> >>
>> >> >>> >> Does the same issue happen with other processors bundled with
>> Apache
>> >> >>> >> NiFi without your custom processor running?
>> >> >>> >>
>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the
>> >> same
>> >> >>> >> FlowFile can be processed again after restarting NiFi.
>> >> >>> >>
>> >> >>> >> Thanks,
>> >> >>> >> Koji
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com>
>> wrote:
>> >> >>> >> > Thanks Koji, I will look into this article about the record
>> model.
>> >> >>> >> >
>> >> >>> >> > By the way, that error I previously mentioned to you occurred
>> >> again, I
>> >> >>> >> > could see the sql query was executed twice in the log, this
>> time
>> >> I had
>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as below:
>> >> >>> >> >
>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >> 执行sql语句:
>> >> >>> >> SELECT
>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
>> FROM
>> >> >>> >> > dbo.ods_extractDataDebug;
>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
>> drop
>> >> >>> column
>> >> >>> >> _id;
>> >> >>> >> >
>> >> >>> >> > and it was executed again later:
>> >> >>> >> >
>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >> >>> >> 执行sql语句失败:SELECT
>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
>> FROM
>> >> >>> >> > dbo.ods_extractDataDebug;
>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
>> drop
>> >> >>> column
>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException:
>> 数据库中已存在名为
>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
>> >> >>> makeFromDatabaseError(
>> >> >>> >> SQLServerException.java:217)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResul
>> t(
>> >> >>> >> SQLServerStatement.java:1655)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>> >> doExecuteStatement(
>> >> >>> >> SQLServerStatement.java:885)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
>> >> >>> StmtExecCmd.doExecute(
>> >> >>> >> SQLServerStatement.java:778)
>> >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
>> >> >>> java:7505)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeComm
>> and(
>> >> >>> >> SQLServerConnection.java:2445)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeComma
>> nd(
>> >> >>> >> SQLServerStatement.java:191)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeState
>> ment(
>> >> >>> >> SQLServerStatement.java:166)
>> >> >>> >> > at
>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
>> >> >>> >> SQLServerStatement.java:751)
>> >> >>> >> > at
>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> >> >>> >> DelegatingStatement.java:264)
>> >> >>> >> > at
>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> >> >>> >> DelegatingStatement.java:264)
>> >> >>> >> > at
>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> >> >>> >> executeSql(ExecuteSqlCommand.java:194)
>> >> >>> >> > at
>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164)
>> >> >>> >> > at
>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>> >> >>> >> AbstractProcessor.java:27)
>> >> >>> >> > at
>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>> >> >>> >> StandardProcessorNode.java:1119)
>> >> >>> >> > at
>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>> >> call(
>> >> >>> >> ContinuallyRunProcessorTask.java:147)
>> >> >>> >> > at
>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>> >> call(
>> >> >>> >> ContinuallyRunProcessorTask.java:47)
>> >> >>> >> > at
>> >> >>> >> > org.apache.nifi.controller.scheduling.
>> >> TimerDrivenSchedulingAgent$1.
>> >> >>> run(
>> >> >>> >> TimerDrivenSchedulingAgent.java:128)
>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter.
>> >> >>> >> call(Executors.java:511)
>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset(
>> >> FutureTask.java:308)
>> >> >>> >> > at
>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
>> >> java:180)
>> >> >>> >> > at
>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> >> >>> >> > at
>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> >> >>> >> ThreadPoolExecutor.java:1142)
>> >> >>> >> > at
>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> >> >>> >> ThreadPoolExecutor.java:617)
>> >> >>> >> > at java.lang.Thread.run(Thread.java:745)
>> >> >>> >> >
>> >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException:
>> >> FlowFile
>> >> >>> >> > Repository failed to update", not sure if this is the reason
>> the
>> >> >>> FlowFile
>> >> >>> >> > got processed twice.  Could you help to take a look at my log
>> >> file?
>> >> >>> >> Thanks.
>> >> >>> >> > You could get the log file via the link:
>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
>> >> >>> N9Xu6zMEi3/view
>> >> >>> >> >
>> >> >>> >> > Best Regards,
>> >> >>> >> > Ben
>> >> >>> >> >
>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <
>> ijokaruma...@gmail.com
>> >> >:
>> >> >>> >> >
>> >> >>> >> >> Hi Ben,
>> >> >>> >> >>
>> >> >>> >> >> This blog post written by Mark, would be a good starting
>> point
>> >> to get
>> >> >>> >> >> familiar with NiFi Record model.
>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-
>> >> with-nifi
>> >> >>> >> >>
>> >> >>> >> >> HA for DistributedMapCacheClientService and
>> >> >>> DistributedMapCacheServer
>> >> >>> >> >> pair is not supported at the moment. If you need
>> >> HighAvailability,
>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication
>> >> will
>> >> >>> >> >> provide that, I haven't tried that myself though.
>> >> >>> >> >> https://redis.io/topics/replication
>> >> >>> >> >>
>> >> >>> >> >> Thanks,
>> >> >>> >> >> Koji
>> >> >>> >> >>
>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com>
>> >> wrote:
>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and
>> seen
>> >> >>> >> anything
>> >> >>> >> >> > about the NiFi record data model when I was reading the
>> NiFi
>> >> >>> >> >> > documentations,could you tell me where this model is
>> >> documented?
>> >> >>> >> Thanks.
>> >> >>> >> >> >
>> >> >>> >> >> > By the way, to my knowledge, when you need to use the
>> >> >>> >> >> DistributedMapCacheServer
>> >> >>> >> >> > from DistributedMapCacheClientService, you need to
>> specify the
>> >> >>> host
>> >> >>> >> url
>> >> >>> >> >> for
>> >> >>> >> >> > the server, this means inside a NiFi cluster
>> >> >>> >> >> > when I specify the cache server and the node suddenly went
>> >> down, I
>> >> >>> >> >> couldn't
>> >> >>> >> >> > possibly use it until the node goes up again right? Is
>> there
>> >> >>> currently
>> >> >>> >> >> such
>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks.
>> >> >>> >> >> >
>> >> >>> >> >> > Regards,
>> >> >>> >> >> > Ben
>> >> >>> >> >> >
>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <
>> >> ijokaruma...@gmail.com>:
>> >> >>> >> >> >
>> >> >>> >> >> >> Hi Ben,
>> >> >>> >> >> >>
>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is
>> used
>> >> to
>> >> >>> share
>> >> >>> >> >> >> state among different processors, and it can be used by
>> your
>> >> >>> custom
>> >> >>> >> >> >> processors, too.
>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies
>> >> between
>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that
>> >> requires
>> >> >>> >> that
>> >> >>> >> >> >> constraint at least for better performance and simplicity.
>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large
>> amount of
>> >> >>> data,
>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile,
>> instead of
>> >> >>> >> batches
>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use
>> NiFi
>> >> >>> Record
>> >> >>> >> >> >> data model to embed multiple records within a FlowFile,
>> Record
>> >> >>> should
>> >> >>> >> >> >> perform better.
>> >> >>> >> >> >>
>> >> >>> >> >> >> Hope this helps.
>> >> >>> >> >> >>
>> >> >>> >> >> >> Thanks,
>> >> >>> >> >> >> Koji
>> >> >>> >> >> >>
>> >> >>> >> >> >>
>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com
>> >
>> >> wrote:
>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in
>> nifi
>> >> which
>> >> >>> >> could
>> >> >>> >> >> >> sync
>> >> >>> >> >> >> > status between my custom processors.
>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL work
>> >> using
>> >> >>> nifi
>> >> >>> >> and
>> >> >>> >> >> >> I'm
>> >> >>> >> >> >> > extracting the data from DB into batches of
>> FlowFiles(each
>> >> >>> batch of
>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the
>> >> batch).
>> >> >>> >> >> >> > There're some groups of custom processors downstream
>> that
>> >> need
>> >> >>> to
>> >> >>> >> >> process
>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And we
>> >> expect
>> >> >>> these
>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a time.
>> >> >>> >> >> >> > Therefore we need to implement a custom Wait
>> processor(let's
>> >> >>> just
>> >> >>> >> >> call it
>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of
>> FlowFiles
>> >> while
>> >> >>> >> the
>> >> >>> >> >> >> > business processors were handling the batch of FlowFiles
>> >> whose
>> >> >>> >> >> creation
>> >> >>> >> >> >> > time is earlier.
>> >> >>> >> >> >> >
>> >> >>> >> >> >> > In order to implement this, all the WaitBatch processors
>> >> placed
>> >> >>> in
>> >> >>> >> the
>> >> >>> >> >> >> flow
>> >> >>> >> >> >> > need to read/update records in a shared map so that each
>> >> set of
>> >> >>> >> >> >> > business-logic processors process one batch at a time.
>> >> >>> >> >> >> > The entries are keyed using the batch number of the
>> >> FlowFiles
>> >> >>> and
>> >> >>> >> the
>> >> >>> >> >> >> value
>> >> >>> >> >> >> > of each entry is a batch release counter number which
>> >> counts the
>> >> >>> >> >> number
>> >> >>> >> >> >> of
>> >> >>> >> >> >> > times the batch of FlowFiles has passed through
>> >> >>> >> >> >> > a WaitBatch processor.
>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to
>> >> increment
>> >> >>> the
>> >> >>> >> >> batch
>> >> >>> >> >> >> > number entry's value by 1 and then the released batch
>> >> number and
>> >> >>> >> >> counter
>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch with
>> >> >>> >> StateManager;
>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will
>> check if
>> >> the
>> >> >>> >> >> counter
>> >> >>> >> >> >> > value of the previous released batch number in the
>> shared
>> >> map is
>> >> >>> >> >> greater
>> >> >>> >> >> >> > than the one saved locally, if the entry for the batch
>> >> number
>> >> >>> >> does't
>> >> >>> >> >> >> > exist(already removed) or the value in the shared map is
>> >> >>> greater,
>> >> >>> >> the
>> >> >>> >> >> >> next
>> >> >>> >> >> >> > batch will be released and the local state and the
>> entry on
>> >> the
>> >> >>> >> shared
>> >> >>> >> >> >> map
>> >> >>> >> >> >> > will be updated similarly.
>> >> >>> >> >> >> > In the end of the flow, a custom processor will get the
>> >> batch
>> >> >>> >> number
>> >> >>> >> >> from
>> >> >>> >> >> >> > each batch and remove the entry from the shared map .
>> >> >>> >> >> >> >
>> >> >>> >> >> >> > So this implementation requires a shared map that could
>> >> >>> read/update
>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify
>> >> processors
>> >> >>> in
>> >> >>> >> >> NIFI
>> >> >>> >> >> >> and
>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService
>> and
>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm
>> wondering
>> >> if I
>> >> >>> >> could
>> >> >>> >> >> use
>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my
>> >> logic. I
>> >> >>> also
>> >> >>> >> >> saw
>> >> >>> >> >> >> > another implementation called
>> RedisDistributedMapCacheClient
>> >> >>> >> Service
>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis).
>> Thanks
>> >> in
>> >> >>> >> advance
>> >> >>> >> >> >> for
>> >> >>> >> >> >> > any suggestions.
>> >> >>> >> >> >> >
>> >> >>> >> >> >> > Regards,
>> >> >>> >> >> >> > Ben
>> >> >>> >> >> >>
>> >> >>> >> >>
>> >> >>> >>
>> >> >>>
>> >>
>>
>
>

Reply via email to