Hi Ben, you can filter events by timestamp as well.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#searching-for-events

On Wed, Dec 27, 2017 at 6:28 PM, 尹文才 <batman...@gmail.com> wrote:
> Hi Koji, I saw it was only showing the 1000 events so I couldn't see the
> event when the FlowFile was created.
>
> Regards,
> Ben
>
> 2017-12-27 17:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> I see, thanks. The easiest way to look at provenance events would be
>> by right clicking a processor instance you are interested in, then
>> select 'View data provenance' context menu. This way, NiFi displays
>> provenance events for the selected processor.
>>
>> Koji
>>
>> On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <batman...@gmail.com> wrote:
>> > Hi Koji, sorry about the provenance exception, it was because there's no
>> > space left on the machine(filled up with logs)
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>:
>> >
>> >> Hi Koji, thanks, the names of the temp tables are created with format
>> >> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the
>> second
>> >> part is a random number with length of 4.
>> >> So I think it's not possible to have 2 duplicate table names, the only
>> >> possibly I could think is the flowfile is passed into the processor
>> twice.
>> >>
>> >> About the provenance, I had updated to use the
>> >> WriteAheadProvenanceRepository implementation, but when I tried to check
>> >> the data provenance, it showed me the following exception message:
>> >> HTTP ERROR 500
>> >>
>> >> Problem accessing /nifi/provenance. Reason:
>> >>
>> >>     Server Error
>> >>
>> >> Caused by:
>> >>
>> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1:
>> java.lang.NullPointerException
>> >>       at org.eclipse.jetty.server.handler.HandlerCollection.
>> handle(HandlerCollection.java:138)
>> >>       at org.eclipse.jetty.server.handler.gzip.GzipHandler.
>> handle(GzipHandler.java:561)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>> >>       at org.eclipse.jetty.server.HttpChannel.handle(
>> HttpChannel.java:320)
>> >>       at org.eclipse.jetty.server.HttpConnection.onFillable(
>> HttpConnection.java:251)
>> >>       at org.eclipse.jetty.io.AbstractConnection$
>> ReadCallback.succeeded(AbstractConnection.java:279)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
>> SslConnection.java:258)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
>> SslConnection.java:147)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(
>> ChannelEndPoint.java:124)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
>> QueuedThreadPool.java:672)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
>> QueuedThreadPool.java:590)
>> >>       at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
>> java.lang.NullPointerException
>> >>       at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(
>> ServletHolder.java:596)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:655)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.getServlet(
>> ServletHolder.java:498)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
>> ServletHolder.java:785)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.prepare(
>> ServletHolder.java:770)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doHandle(
>> ServletHandler.java:538)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:143)
>> >>       at org.eclipse.jetty.security.SecurityHandler.handle(
>> SecurityHandler.java:548)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:190)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doHandle(SessionHandler.java:1593)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:188)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doHandle(ContextHandler.java:1239)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:168)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doScope(
>> ServletHandler.java:481)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doScope(SessionHandler.java:1562)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:166)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doScope(ContextHandler.java:1141)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:141)
>> >>       at org.eclipse.jetty.server.handler.HandlerCollection.
>> handle(HandlerCollection.java:118)
>> >>       at org.eclipse.jetty.server.handler.gzip.GzipHandler.
>> handle(GzipHandler.java:561)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>> >>       at org.eclipse.jetty.server.HttpChannel.handle(
>> HttpChannel.java:320)
>> >>       at org.eclipse.jetty.server.HttpConnection.onFillable(
>> HttpConnection.java:251)
>> >>       at org.eclipse.jetty.io.AbstractConnection$
>> ReadCallback.succeeded(AbstractConnection.java:279)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
>> SslConnection.java:258)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
>> SslConnection.java:147)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(
>> ChannelEndPoint.java:124)
>> >>       at org.eclipse.jetty.util.thread.Invocable.invokePreferred(
>> Invocable.java:122)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>> >>       at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
>> produceConsume(ExecuteProduceConsume.java:201)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>> >>       ... 3 more
>> >> Caused by: java.lang.NullPointerException
>> >>       at org.apache.jasper.servlet.JspServlet.handleMissingResource(
>> JspServlet.java:397)
>> >>       at org.apache.jasper.servlet.JspServlet.serviceJspFile(
>> JspServlet.java:387)
>> >>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:637)
>> >>       ... 36 more
>> >>
>> >> Caused by:
>> >>
>> >> org.eclipse.jetty.servlet.ServletHolder$1:
>> java.lang.NullPointerException
>> >>       at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(
>> ServletHolder.java:596)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:655)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.getServlet(
>> ServletHolder.java:498)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
>> ServletHolder.java:785)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.prepare(
>> ServletHolder.java:770)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doHandle(
>> ServletHandler.java:538)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:143)
>> >>       at org.eclipse.jetty.security.SecurityHandler.handle(
>> SecurityHandler.java:548)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:190)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doHandle(SessionHandler.java:1593)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:188)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doHandle(ContextHandler.java:1239)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:168)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doScope(
>> ServletHandler.java:481)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doScope(SessionHandler.java:1562)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:166)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doScope(ContextHandler.java:1141)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:141)
>> >>       at org.eclipse.jetty.server.handler.HandlerCollection.
>> handle(HandlerCollection.java:118)
>> >>       at org.eclipse.jetty.server.handler.gzip.GzipHandler.
>> handle(GzipHandler.java:561)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>> >>       at org.eclipse.jetty.server.HttpChannel.handle(
>> HttpChannel.java:320)
>> >>       at org.eclipse.jetty.server.HttpConnection.onFillable(
>> HttpConnection.java:251)
>> >>       at org.eclipse.jetty.io.AbstractConnection$
>> ReadCallback.succeeded(AbstractConnection.java:279)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
>> SslConnection.java:258)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
>> SslConnection.java:147)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(
>> ChannelEndPoint.java:124)
>> >>       at org.eclipse.jetty.util.thread.Invocable.invokePreferred(
>> Invocable.java:122)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>> >>       at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
>> produceConsume(ExecuteProduceConsume.java:201)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
>> QueuedThreadPool.java:672)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
>> QueuedThreadPool.java:590)
>> >>       at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: java.lang.NullPointerException
>> >>       at org.apache.jasper.servlet.JspServlet.handleMissingResource(
>> JspServlet.java:397)
>> >>       at org.apache.jasper.servlet.JspServlet.serviceJspFile(
>> JspServlet.java:387)
>> >>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:637)
>> >>       ... 36 more
>> >>
>> >> Caused by:
>> >>
>> >> java.lang.NullPointerException
>> >>       at org.apache.jasper.servlet.JspServlet.handleMissingResource(
>> JspServlet.java:397)
>> >>       at org.apache.jasper.servlet.JspServlet.serviceJspFile(
>> JspServlet.java:387)
>> >>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.initServlet(
>> ServletHolder.java:637)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.getServlet(
>> ServletHolder.java:498)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
>> ServletHolder.java:785)
>> >>       at org.eclipse.jetty.servlet.ServletHolder.prepare(
>> ServletHolder.java:770)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doHandle(
>> ServletHandler.java:538)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:143)
>> >>       at org.eclipse.jetty.security.SecurityHandler.handle(
>> SecurityHandler.java:548)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:190)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doHandle(SessionHandler.java:1593)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextHandle(ScopedHandler.java:188)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doHandle(ContextHandler.java:1239)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:168)
>> >>       at org.eclipse.jetty.servlet.ServletHandler.doScope(
>> ServletHandler.java:481)
>> >>       at org.eclipse.jetty.server.session.SessionHandler.
>> doScope(SessionHandler.java:1562)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.
>> nextScope(ScopedHandler.java:166)
>> >>       at org.eclipse.jetty.server.handler.ContextHandler.
>> doScope(ContextHandler.java:1141)
>> >>       at org.eclipse.jetty.server.handler.ScopedHandler.handle(
>> ScopedHandler.java:141)
>> >>       at org.eclipse.jetty.server.handler.HandlerCollection.
>> handle(HandlerCollection.java:118)
>> >>       at org.eclipse.jetty.server.handler.gzip.GzipHandler.
>> handle(GzipHandler.java:561)
>> >>       at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
>> HandlerWrapper.java:132)
>> >>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>> >>       at org.eclipse.jetty.server.HttpChannel.handle(
>> HttpChannel.java:320)
>> >>       at org.eclipse.jetty.server.HttpConnection.onFillable(
>> HttpConnection.java:251)
>> >>       at org.eclipse.jetty.io.AbstractConnection$
>> ReadCallback.succeeded(AbstractConnection.java:279)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
>> SslConnection.java:258)
>> >>       at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
>> SslConnection.java:147)
>> >>       at org.eclipse.jetty.io.FillInterest.fillable(
>> FillInterest.java:110)
>> >>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(
>> ChannelEndPoint.java:124)
>> >>       at org.eclipse.jetty.util.thread.Invocable.invokePreferred(
>> Invocable.java:122)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>> >>       at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.
>> produceConsume(ExecuteProduceConsume.java:201)
>> >>       at org.eclipse.jetty.util.thread.strategy.
>> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
>> QueuedThreadPool.java:672)
>> >>       at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
>> QueuedThreadPool.java:590)
>> >>       at java.lang.Thread.run(Thread.java:745)
>> >>
>> >> My configuration inside nifi.properties is as below:
>> >> # Provenance Repository Properties
>> >> nifi.provenance.repository.implementation=org.apache.nifi.provenance.
>> >> WriteAheadProvenanceRepository
>> >> nifi.provenance.repository.debug.frequency=1_000_000
>> >> nifi.provenance.repository.encryption.key.provider.implementation=
>> >> nifi.provenance.repository.encryption.key.provider.location=
>> >> nifi.provenance.repository.encryption.key.id=
>> >> nifi.provenance.repository.encryption.key=
>> >>
>> >> # Persistent Provenance Repository Properties
>> >> nifi.provenance.repository.directory.default=../provenance_repository
>> >> nifi.provenance.repository.max.storage.time=24 hours
>> >> nifi.provenance.repository.max.storage.size=1 GB
>> >> nifi.provenance.repository.rollover.time=30 secs
>> >> nifi.provenance.repository.rollover.size=100 MB
>> >> nifi.provenance.repository.query.threads=2
>> >> nifi.provenance.repository.index.threads=1
>> >> nifi.provenance.repository.compress.on.rollover=true
>> >> nifi.provenance.repository.always.sync=false
>> >> nifi.provenance.repository.index.shard.size=4 GB
>> >>
>> >>
>> >> By the way, does this Data Provenance list all FlowFiles ever created or
>> >> only part of it? Should I try to find the FlowFile with the exception
>> time
>> >> in the log? Thanks.
>> >>
>> >> Regards,
>> >> Ben
>> >>
>> >> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >>
>> >>> Hi Ben,
>> >>>
>> >>> The ExecuteSqlCommand retry logic does not execute the same query
>> >>> multiple times if it succeeds.
>> >>> So, there must be input FlowFiles containing the same query had been
>> >>> passed more than once.
>> >>> It could be the same FlowFile, or different FlowFiles generated by the
>> >>> first processor for some reason.
>> >>> To investigate those kind of FlowFile level information, NiFi
>> >>> provenance data and FlowFile lineage will be very useful.
>> >>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#
>> >>> viewing-flowfile-lineage
>> >>>
>> >>> I didn't mention about it earlier because you were having Provenance
>> >>> repository performance issue, but I hope you can use it now with the
>> >>> WriteAheadProvenanceRepository.
>> >>>
>> >>> Thanks,
>> >>> Koji
>> >>>
>> >>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote:
>> >>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to
>> re-execute
>> >>> > the sql query if the connection is lost(connection could be
>> unstable),
>> >>> my
>> >>> > idea is to only transfer the FlowFile to the success relationship
>> >>> > after successfully executing the sql query. You could see the do
>> while
>> >>> loop
>> >>> > in the code, the transaction will be rollbacked if the execution
>> >>> failed; if
>> >>> > the connection is lost, it will retry to execute the sql.
>> >>> > Will this logic cause my sql to be executed twice?
>> >>> >
>> >>> > For the WaitBatch processor, I will take your approach to test
>> >>> individually
>> >>> > to see if the WaitBatch processor could cause the FlowFile repository
>> >>> > checkpointing failure.
>> >>> >
>> >>> > Regards,
>> >>> > Ben
>> >>> >
>> >>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >>> >
>> >>> >> Hi Ben,
>> >>> >>
>> >>> >> Excuse me, I'm trying, but probably I don't fully understand what
>> you
>> >>> >> want to achieve with the flow.
>> >>> >>
>> >>> >> It looks weird that WaitBatch is failing with such FlowFile
>> repository
>> >>> >> error, while other processor such as ReplaceText succeeds.
>> >>> >> I recommend to test WaitBatch alone first without combining the
>> >>> >> database related processors, by feeding a test FlowFile having
>> >>> >> expected FlowFile attributes.
>> >>> >> Such input FlowFiles can be created by GenerateFlowFile processor.
>> >>> >> If the same error happens with only WaitBatch processor, then it
>> >>> >> should be easier to debug.
>> >>> >>
>> >>> >> Thanks,
>> >>> >> Koji
>> >>> >>
>> >>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <
>> ijokaruma...@gmail.com
>> >>> >
>> >>> >> wrote:
>> >>> >> > Hi Ben,
>> >>> >> >
>> >>> >> > The one thing that looks strange in the screenshot is the
>> >>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming
>> connection.
>> >>> >> > Those should be transferred to 'failure' relationship.
>> >>> >> >
>> >>> >> > Following executeSql() method, shouldn't it re-throw the caught
>> >>> >> exception?
>> >>> >> >
>> >>> >> >
>> >>> >> >             try (Connection con = dbcpService.getConnection()) {
>> >>> >> >                 logger.debug("设置autoCommit为false");
>> >>> >> >                 con.setAutoCommit(false);
>> >>> >> >
>> >>> >> >                 try (Statement stmt = con.createStatement()) {
>> >>> >> >                     logger.info("执行sql语句: {}", new
>> Object[]{sql});
>> >>> >> >                     stmt.execute(sql);
>> >>> >> >
>> >>> >> >                     // 所有sql语句执行在一个transaction内
>> >>> >> >                     logger.debug("提交transaction");
>> >>> >> >                     con.commit();
>> >>> >> >                 } catch (Exception ex) {
>> >>> >> >                     logger.error("执行sql语句失败:{}", new Object[]{sql,
>> >>> ex});
>> >>> >> >                     con.rollback();
>> >>> >> >                     //将exception抛到外层处理
>> >>> >> >                     throw ex;
>> >>> >> >                 } finally {
>> >>> >> >                     logger.debug("重新设置autoCommit为true");
>> >>> >> >                     con.setAutoCommit(true);
>> >>> >> >                 }
>> >>> >> >             } catch (Exception ex) {
>> >>> >> > // HERE, the exception is swallowed, that's why the FlowFiles
>> stay in
>> >>> >> > the incoming connection.
>> >>> >> >                 logger.error("重试执行sql语句:{}", new Object[]{sql,
>> ex});
>> >>> >> >                 retryOnFail = true;
>> >>> >> >             }
>> >>> >> >
>> >>> >> > Thanks,
>> >>> >> > Koji
>> >>> >> >
>> >>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
>> >>> >> >> Hi Koji, no problem. You could check the code of processor
>> >>> WaitBatch at
>> >>> >> the
>> >>> >> >> link:
>> >>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQ
>> fWQ
>> >>> >> >>
>> >>> >> >> I also uploaded a snapshot of part of NiFi flow which includes
>> the
>> >>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at
>> the
>> >>> >> link:
>> >>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3
>> >>> IVi2h/view
>> >>> >> >>
>> >>> >> >> You mentioned above that FlowFile repository fails checkpointing
>> >>> will
>> >>> >> >> affect other processors to process same FlowFile again, but as
>> you
>> >>> could
>> >>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second
>> >>> >> processor
>> >>> >> >> and before the WaitBatch processor, even if the FlowFile
>> repository
>> >>> >> >> checkpointing failure is caused by WaitBatch, could it lead to
>> the
>> >>> >> >> processors before it to process a FlowFile multiple times?
>> Thanks.
>> >>> >> >>
>> >>> >> >> Regards,
>> >>> >> >> Ben
>> >>> >> >>
>> >>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com
>> >:
>> >>> >> >>
>> >>> >> >>> Hi Ben,
>> >>> >> >>>
>> >>> >> >>> I was referring these two log messages in your previous email.
>> >>> >> >>> These two messages are both written by ExecuteSqlCommand, it
>> does
>> >>> not
>> >>> >> >>> mean 'it was executed again'.
>> >>> >> >>>
>> >>> >> >>> ```
>> >>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>> >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>> >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >>> 执行sql语句:
>> >>> >> SELECT
>> >>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
>> FROM
>> >>> >> >>> dbo.ods_extractDataDebug;
>> >>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195
>> drop
>> >>> >> column
>> >>> >> >>> _id;
>> >>> >> >>>
>> >>> >> >>> and it was executed again later:
>> >>> >> >>>
>> >>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>> >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>> >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >>> >> >>> 执行sql语句失败:SELECT
>> >>> >> >>> ```
>> >>> >> >>>
>> >>> >> >>> As you written, the case where FlowFile repository fails
>> >>> checkpointing
>> >>> >> >>> will affect other processors to process same FlowFiles again.
>> >>> However
>> >>> >> >>> there won't be a simple solution to every processor to rollback
>> its
>> >>> >> >>> job as different processors do different things. Creating a temp
>> >>> table
>> >>> >> >>> if not exist seems right approach to me.
>> >>> >> >>>
>> >>> >> >>> At the same time, the route cause of getting FlowFile repository
>> >>> >> >>> failed should be investigated. Is it possible to share WaitBatch
>> >>> code?
>> >>> >> >>> The reason why ask this is all 'FlowFile Repository failed to
>> >>> update'
>> >>> >> >>> is related to WaitBatch processor in the log that you shared
>> >>> earlier.
>> >>> >> >>>
>> >>> >> >>> Thanks,
>> >>> >> >>> Koji
>> >>> >> >>>
>> >>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com>
>> wrote:
>> >>> >> >>> > Hi Koji, I will print the sql before actually executing it,
>> but I
>> >>> >> checked
>> >>> >> >>> > the error log line you mentioned in your reply, this error was
>> >>> >> thrown by
>> >>> >> >>> > NiFi from within another processor called WaitBatch.
>> >>> >> >>> > I didn't find similar errors as the one from the
>> >>> ExecuteSqlCommand
>> >>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is
>> >>> used to
>> >>> >> >>> > create temp database tables.
>> >>> >> >>> > You could check my ExecuteSqlCommand code via the link:
>> >>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh
>> >>> jSk_5P
>> >>> >> >>> >
>> >>> >> >>> > If the error is really caused by FlowFile repository
>> checkpoint
>> >>> >> failure
>> >>> >> >>> and
>> >>> >> >>> > the flowfile was executed twice, I may have to create the temp
>> >>> table
>> >>> >> only
>> >>> >> >>> > if doesn't exist, I didn't fix this bug in this way
>> >>> >> >>> > right away is because I was afraid this fix could cover some
>> >>> other
>> >>> >> >>> problems.
>> >>> >> >>> >
>> >>> >> >>> > Thanks.
>> >>> >> >>> >
>> >>> >> >>> > Regards,
>> >>> >> >>> > Ben
>> >>> >> >>> >
>> >>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <
>> ijokaruma...@gmail.com
>> >>> >:
>> >>> >> >>> >
>> >>> >> >>> >> Hi Ben,
>> >>> >> >>> >>
>> >>> >> >>> >> The following two log messages are very close in terms of
>> >>> written
>> >>> >> >>> >> timestamp, but have different log level.
>> >>> >> >>> >> 2017-12-26 07:00:01,312 INFO
>> >>> >> >>> >> 2017-12-26 07:00:01,315 ERROR
>> >>> >> >>> >>
>> >>> >> >>> >> I guess those are logged within a single onTrigger of your
>> >>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing,
>> the
>> >>> >> other
>> >>> >> >>> >> is when it caught an exception. Just guessing as I don't have
>> >>> access
>> >>> >> >>> >> to the code.
>> >>> >> >>> >>
>> >>> >> >>> >> Does the same issue happen with other processors bundled with
>> >>> Apache
>> >>> >> >>> >> NiFi without your custom processor running?
>> >>> >> >>> >>
>> >>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then
>> the
>> >>> >> same
>> >>> >> >>> >> FlowFile can be processed again after restarting NiFi.
>> >>> >> >>> >>
>> >>> >> >>> >> Thanks,
>> >>> >> >>> >> Koji
>> >>> >> >>> >>
>> >>> >> >>> >>
>> >>> >> >>> >>
>> >>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com>
>> >>> wrote:
>> >>> >> >>> >> > Thanks Koji, I will look into this article about the record
>> >>> model.
>> >>> >> >>> >> >
>> >>> >> >>> >> > By the way, that error I previously mentioned to you
>> occurred
>> >>> >> again, I
>> >>> >> >>> >> > could see the sql query was executed twice in the log, this
>> >>> time
>> >>> >> I had
>> >>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as
>> below:
>> >>> >> >>> >> >
>> >>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process
>> Thread-1]
>> >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>> >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >>> >> 执行sql语句:
>> >>> >> >>> >> SELECT
>> >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_
>> 20171226031801926_9195
>> >>> FROM
>> >>> >> >>> >> > dbo.ods_extractDataDebug;
>> >>> >> >>> >> > alter table tmp.ods_extractDataDebug_
>> 20171226031801926_9195
>> >>> drop
>> >>> >> >>> column
>> >>> >> >>> >> _id;
>> >>> >> >>> >> >
>> >>> >> >>> >> > and it was executed again later:
>> >>> >> >>> >> >
>> >>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process
>> Thread-1]
>> >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>> >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>> >>> >> >>> >> 执行sql语句失败:SELECT
>> >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_
>> 20171226031801926_9195
>> >>> FROM
>> >>> >> >>> >> > dbo.ods_extractDataDebug;
>> >>> >> >>> >> > alter table tmp.ods_extractDataDebug_
>> 20171226031801926_9195
>> >>> drop
>> >>> >> >>> column
>> >>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException:
>> >>> 数据库中已存在名为
>> >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>> >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
>> >>> >> >>> makeFromDatabaseError(
>> >>> >> >>> >> SQLServerException.java:217)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>> getNextResul
>> >>> t(
>> >>> >> >>> >> SQLServerStatement.java:1655)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>> >>> >> doExecuteStatement(
>> >>> >> >>> >> SQLServerStatement.java:885)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
>> >>> >> >>> StmtExecCmd.doExecute(
>> >>> >> >>> >> SQLServerStatement.java:778)
>> >>> >> >>> >> > at com.microsoft.sqlserver.jdbc.
>> TDSCommand.execute(IOBuffer.
>> >>> >> >>> java:7505)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.
>> executeComm
>> >>> and(
>> >>> >> >>> >> SQLServerConnection.java:2445)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>> executeComma
>> >>> nd(
>> >>> >> >>> >> SQLServerStatement.java:191)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>> executeState
>> >>> ment(
>> >>> >> >>> >> SQLServerStatement.java:166)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
>> >>> >> >>> >> SQLServerStatement.java:751)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> >>> >> >>> >> DelegatingStatement.java:264)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>> >>> >> >>> >> DelegatingStatement.java:264)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> >>> >> >>> >> executeSql(ExecuteSqlCommand.java:194)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>> >>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>> >>> >> >>> >> AbstractProcessor.java:27)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.
>> onTrigger(
>> >>> >> >>> >> StandardProcessorNode.java:1119)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.nifi.controller.tasks.
>> ContinuallyRunProcessorTask.
>> >>> >> call(
>> >>> >> >>> >> ContinuallyRunProcessorTask.java:147)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.nifi.controller.tasks.
>> ContinuallyRunProcessorTask.
>> >>> >> call(
>> >>> >> >>> >> ContinuallyRunProcessorTask.java:47)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > org.apache.nifi.controller.scheduling.
>> >>> >> TimerDrivenSchedulingAgent$1.
>> >>> >> >>> run(
>> >>> >> >>> >> TimerDrivenSchedulingAgent.java:128)
>> >>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter.
>> >>> >> >>> >> call(Executors.java:511)
>> >>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset(
>> >>> >> FutureTask.java:308)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> >>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
>> >>> >> java:180)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> >>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.
>> java:294)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> >>> >> >>> >> ThreadPoolExecutor.java:1142)
>> >>> >> >>> >> > at
>> >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> >>> >> >>> >> ThreadPoolExecutor.java:617)
>> >>> >> >>> >> > at java.lang.Thread.run(Thread.java:745)
>> >>> >> >>> >> >
>> >>> >> >>> >> > I also saw a lot of NiFi's exception like
>> "ProcessException:
>> >>> >> FlowFile
>> >>> >> >>> >> > Repository failed to update", not sure if this is the
>> reason
>> >>> the
>> >>> >> >>> FlowFile
>> >>> >> >>> >> > got processed twice.  Could you help to take a look at my
>> log
>> >>> >> file?
>> >>> >> >>> >> Thanks.
>> >>> >> >>> >> > You could get the log file via the link:
>> >>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
>> >>> >> >>> N9Xu6zMEi3/view
>> >>> >> >>> >> >
>> >>> >> >>> >> > Best Regards,
>> >>> >> >>> >> > Ben
>> >>> >> >>> >> >
>> >>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <
>> >>> ijokaruma...@gmail.com
>> >>> >> >:
>> >>> >> >>> >> >
>> >>> >> >>> >> >> Hi Ben,
>> >>> >> >>> >> >>
>> >>> >> >>> >> >> This blog post written by Mark, would be a good starting
>> >>> point
>> >>> >> to get
>> >>> >> >>> >> >> familiar with NiFi Record model.
>> >>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-
>> >>> >> with-nifi
>> >>> >> >>> >> >>
>> >>> >> >>> >> >> HA for DistributedMapCacheClientService and
>> >>> >> >>> DistributedMapCacheServer
>> >>> >> >>> >> >> pair is not supported at the moment. If you need
>> >>> >> HighAvailability,
>> >>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis
>> replication
>> >>> >> will
>> >>> >> >>> >> >> provide that, I haven't tried that myself though.
>> >>> >> >>> >> >> https://redis.io/topics/replication
>> >>> >> >>> >> >>
>> >>> >> >>> >> >> Thanks,
>> >>> >> >>> >> >> Koji
>> >>> >> >>> >> >>
>> >>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com
>> >
>> >>> >> wrote:
>> >>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard
>> and
>> >>> seen
>> >>> >> >>> >> anything
>> >>> >> >>> >> >> > about the NiFi record data model when I was reading the
>> >>> NiFi
>> >>> >> >>> >> >> > documentations,could you tell me where this model is
>> >>> >> documented?
>> >>> >> >>> >> Thanks.
>> >>> >> >>> >> >> >
>> >>> >> >>> >> >> > By the way, to my knowledge, when you need to use the
>> >>> >> >>> >> >> DistributedMapCacheServer
>> >>> >> >>> >> >> > from DistributedMapCacheClientService, you need to
>> >>> specify the
>> >>> >> >>> host
>> >>> >> >>> >> url
>> >>> >> >>> >> >> for
>> >>> >> >>> >> >> > the server, this means inside a NiFi cluster
>> >>> >> >>> >> >> > when I specify the cache server and the node suddenly
>> went
>> >>> >> down, I
>> >>> >> >>> >> >> couldn't
>> >>> >> >>> >> >> > possibly use it until the node goes up again right? Is
>> >>> there
>> >>> >> >>> currently
>> >>> >> >>> >> >> such
>> >>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks.
>> >>> >> >>> >> >> >
>> >>> >> >>> >> >> > Regards,
>> >>> >> >>> >> >> > Ben
>> >>> >> >>> >> >> >
>> >>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <
>> >>> >> ijokaruma...@gmail.com>:
>> >>> >> >>> >> >> >
>> >>> >> >>> >> >> >> Hi Ben,
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is
>> >>> used
>> >>> >> to
>> >>> >> >>> share
>> >>> >> >>> >> >> >> state among different processors, and it can be used by
>> >>> your
>> >>> >> >>> custom
>> >>> >> >>> >> >> >> processors, too.
>> >>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies
>> >>> >> between
>> >>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow
>> that
>> >>> >> requires
>> >>> >> >>> >> that
>> >>> >> >>> >> >> >> constraint at least for better performance and
>> simplicity.
>> >>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large
>> >>> amount of
>> >>> >> >>> data,
>> >>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile,
>> >>> instead of
>> >>> >> >>> >> batches
>> >>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can
>> use
>> >>> NiFi
>> >>> >> >>> Record
>> >>> >> >>> >> >> >> data model to embed multiple records within a FlowFile,
>> >>> Record
>> >>> >> >>> should
>> >>> >> >>> >> >> >> perform better.
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >> >> Hope this helps.
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >> >> Thanks,
>> >>> >> >>> >> >> >> Koji
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <
>> batman...@gmail.com
>> >>> >
>> >>> >> wrote:
>> >>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in
>> >>> nifi
>> >>> >> which
>> >>> >> >>> >> could
>> >>> >> >>> >> >> >> sync
>> >>> >> >>> >> >> >> > status between my custom processors.
>> >>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL
>> work
>> >>> >> using
>> >>> >> >>> nifi
>> >>> >> >>> >> and
>> >>> >> >>> >> >> >> I'm
>> >>> >> >>> >> >> >> > extracting the data from DB into batches of
>> >>> FlowFiles(each
>> >>> >> >>> batch of
>> >>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of
>> the
>> >>> >> batch).
>> >>> >> >>> >> >> >> > There're some groups of custom processors downstream
>> >>> that
>> >>> >> need
>> >>> >> >>> to
>> >>> >> >>> >> >> process
>> >>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And
>> we
>> >>> >> expect
>> >>> >> >>> these
>> >>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a
>> time.
>> >>> >> >>> >> >> >> > Therefore we need to implement a custom Wait
>> >>> processor(let's
>> >>> >> >>> just
>> >>> >> >>> >> >> call it
>> >>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of
>> >>> FlowFiles
>> >>> >> while
>> >>> >> >>> >> the
>> >>> >> >>> >> >> >> > business processors were handling the batch of
>> FlowFiles
>> >>> >> whose
>> >>> >> >>> >> >> creation
>> >>> >> >>> >> >> >> > time is earlier.
>> >>> >> >>> >> >> >> >
>> >>> >> >>> >> >> >> > In order to implement this, all the WaitBatch
>> processors
>> >>> >> placed
>> >>> >> >>> in
>> >>> >> >>> >> the
>> >>> >> >>> >> >> >> flow
>> >>> >> >>> >> >> >> > need to read/update records in a shared map so that
>> each
>> >>> >> set of
>> >>> >> >>> >> >> >> > business-logic processors process one batch at a
>> time.
>> >>> >> >>> >> >> >> > The entries are keyed using the batch number of the
>> >>> >> FlowFiles
>> >>> >> >>> and
>> >>> >> >>> >> the
>> >>> >> >>> >> >> >> value
>> >>> >> >>> >> >> >> > of each entry is a batch release counter number which
>> >>> >> counts the
>> >>> >> >>> >> >> number
>> >>> >> >>> >> >> >> of
>> >>> >> >>> >> >> >> > times the batch of FlowFiles has passed through
>> >>> >> >>> >> >> >> > a WaitBatch processor.
>> >>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to
>> >>> >> increment
>> >>> >> >>> the
>> >>> >> >>> >> >> batch
>> >>> >> >>> >> >> >> > number entry's value by 1 and then the released batch
>> >>> >> number and
>> >>> >> >>> >> >> counter
>> >>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch
>> with
>> >>> >> >>> >> StateManager;
>> >>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will
>> >>> check if
>> >>> >> the
>> >>> >> >>> >> >> counter
>> >>> >> >>> >> >> >> > value of the previous released batch number in the
>> >>> shared
>> >>> >> map is
>> >>> >> >>> >> >> greater
>> >>> >> >>> >> >> >> > than the one saved locally, if the entry for the
>> batch
>> >>> >> number
>> >>> >> >>> >> does't
>> >>> >> >>> >> >> >> > exist(already removed) or the value in the shared
>> map is
>> >>> >> >>> greater,
>> >>> >> >>> >> the
>> >>> >> >>> >> >> >> next
>> >>> >> >>> >> >> >> > batch will be released and the local state and the
>> >>> entry on
>> >>> >> the
>> >>> >> >>> >> shared
>> >>> >> >>> >> >> >> map
>> >>> >> >>> >> >> >> > will be updated similarly.
>> >>> >> >>> >> >> >> > In the end of the flow, a custom processor will get
>> the
>> >>> >> batch
>> >>> >> >>> >> number
>> >>> >> >>> >> >> from
>> >>> >> >>> >> >> >> > each batch and remove the entry from the shared map .
>> >>> >> >>> >> >> >> >
>> >>> >> >>> >> >> >> > So this implementation requires a shared map that
>> could
>> >>> >> >>> read/update
>> >>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify
>> >>> >> processors
>> >>> >> >>> in
>> >>> >> >>> >> >> NIFI
>> >>> >> >>> >> >> >> and
>> >>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService
>> >>> and
>> >>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm
>> >>> wondering
>> >>> >> if I
>> >>> >> >>> >> could
>> >>> >> >>> >> >> use
>> >>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my
>> >>> >> logic. I
>> >>> >> >>> also
>> >>> >> >>> >> >> saw
>> >>> >> >>> >> >> >> > another implementation called
>> >>> RedisDistributedMapCacheClient
>> >>> >> >>> >> Service
>> >>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis).
>> >>> Thanks
>> >>> >> in
>> >>> >> >>> >> advance
>> >>> >> >>> >> >> >> for
>> >>> >> >>> >> >> >> > any suggestions.
>> >>> >> >>> >> >> >> >
>> >>> >> >>> >> >> >> > Regards,
>> >>> >> >>> >> >> >> > Ben
>> >>> >> >>> >> >> >>
>> >>> >> >>> >> >>
>> >>> >> >>> >>
>> >>> >> >>>
>> >>> >>
>> >>>
>> >>
>> >>
>>
>>

Reply via email to