Hi Koji, I saw it was only showing the 1000 events so I couldn't see the event when the FlowFile was created.
Regards, Ben 2017-12-27 17:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > I see, thanks. The easiest way to look at provenance events would be > by right clicking a processor instance you are interested in, then > select 'View data provenance' context menu. This way, NiFi displays > provenance events for the selected processor. > > Koji > > On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi Koji, sorry about the provenance exception, it was because there's no > > space left on the machine(filled up with logs) > > > > Regards, > > Ben > > > > 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>: > > > >> Hi Koji, thanks, the names of the temp tables are created with format > >> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the > second > >> part is a random number with length of 4. > >> So I think it's not possible to have 2 duplicate table names, the only > >> possibly I could think is the flowfile is passed into the processor > twice. > >> > >> About the provenance, I had updated to use the > >> WriteAheadProvenanceRepository implementation, but when I tried to check > >> the data provenance, it showed me the following exception message: > >> HTTP ERROR 500 > >> > >> Problem accessing /nifi/provenance. Reason: > >> > >> Server Error > >> > >> Caused by: > >> > >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > >> at org.eclipse.jetty.server.handler.HandlerCollection. > handle(HandlerCollection.java:138) > >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. > handle(GzipHandler.java:561) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.Server.handle(Server.java:564) > >> at org.eclipse.jetty.server.HttpChannel.handle( > HttpChannel.java:320) > >> at org.eclipse.jetty.server.HttpConnection.onFillable( > HttpConnection.java:251) > >> at org.eclipse.jetty.io.AbstractConnection$ > ReadCallback.succeeded(AbstractConnection.java:279) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( > SslConnection.java:258) > >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( > SslConnection.java:147) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( > ChannelEndPoint.java:124) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( > QueuedThreadPool.java:672) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( > QueuedThreadPool.java:590) > >> at java.lang.Thread.run(Thread.java:745) > >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > >> at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable( > ServletHolder.java:596) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:655) > >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( > ServletHolder.java:498) > >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( > ServletHolder.java:785) > >> at org.eclipse.jetty.servlet.ServletHolder.prepare( > ServletHolder.java:770) > >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( > ServletHandler.java:538) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:143) > >> at org.eclipse.jetty.security.SecurityHandler.handle( > SecurityHandler.java:548) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:190) > >> at org.eclipse.jetty.server.session.SessionHandler. > doHandle(SessionHandler.java:1593) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:188) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doHandle(ContextHandler.java:1239) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:168) > >> at org.eclipse.jetty.servlet.ServletHandler.doScope( > ServletHandler.java:481) > >> at org.eclipse.jetty.server.session.SessionHandler. > doScope(SessionHandler.java:1562) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:166) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doScope(ContextHandler.java:1141) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:141) > >> at org.eclipse.jetty.server.handler.HandlerCollection. > handle(HandlerCollection.java:118) > >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. > handle(GzipHandler.java:561) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.Server.handle(Server.java:564) > >> at org.eclipse.jetty.server.HttpChannel.handle( > HttpChannel.java:320) > >> at org.eclipse.jetty.server.HttpConnection.onFillable( > HttpConnection.java:251) > >> at org.eclipse.jetty.io.AbstractConnection$ > ReadCallback.succeeded(AbstractConnection.java:279) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( > SslConnection.java:258) > >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( > SslConnection.java:147) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( > ChannelEndPoint.java:124) > >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( > Invocable.java:122) > >> at org.eclipse.jetty.util.thread.strategy. > ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. > produceConsume(ExecuteProduceConsume.java:201) > >> at org.eclipse.jetty.util.thread.strategy. > ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > >> ... 3 more > >> Caused by: java.lang.NullPointerException > >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( > JspServlet.java:397) > >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( > JspServlet.java:387) > >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:637) > >> ... 36 more > >> > >> Caused by: > >> > >> org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > >> at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable( > ServletHolder.java:596) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:655) > >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( > ServletHolder.java:498) > >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( > ServletHolder.java:785) > >> at org.eclipse.jetty.servlet.ServletHolder.prepare( > ServletHolder.java:770) > >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( > ServletHandler.java:538) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:143) > >> at org.eclipse.jetty.security.SecurityHandler.handle( > SecurityHandler.java:548) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:190) > >> at org.eclipse.jetty.server.session.SessionHandler. > doHandle(SessionHandler.java:1593) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:188) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doHandle(ContextHandler.java:1239) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:168) > >> at org.eclipse.jetty.servlet.ServletHandler.doScope( > ServletHandler.java:481) > >> at org.eclipse.jetty.server.session.SessionHandler. > doScope(SessionHandler.java:1562) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:166) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doScope(ContextHandler.java:1141) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:141) > >> at org.eclipse.jetty.server.handler.HandlerCollection. > handle(HandlerCollection.java:118) > >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. > handle(GzipHandler.java:561) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.Server.handle(Server.java:564) > >> at org.eclipse.jetty.server.HttpChannel.handle( > HttpChannel.java:320) > >> at org.eclipse.jetty.server.HttpConnection.onFillable( > HttpConnection.java:251) > >> at org.eclipse.jetty.io.AbstractConnection$ > ReadCallback.succeeded(AbstractConnection.java:279) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( > SslConnection.java:258) > >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( > SslConnection.java:147) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( > ChannelEndPoint.java:124) > >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( > Invocable.java:122) > >> at org.eclipse.jetty.util.thread.strategy. > ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. > produceConsume(ExecuteProduceConsume.java:201) > >> at org.eclipse.jetty.util.thread.strategy. > ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( > QueuedThreadPool.java:672) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( > QueuedThreadPool.java:590) > >> at java.lang.Thread.run(Thread.java:745) > >> Caused by: java.lang.NullPointerException > >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( > JspServlet.java:397) > >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( > JspServlet.java:387) > >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:637) > >> ... 36 more > >> > >> Caused by: > >> > >> java.lang.NullPointerException > >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( > JspServlet.java:397) > >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( > JspServlet.java:387) > >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:637) > >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( > ServletHolder.java:498) > >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( > ServletHolder.java:785) > >> at org.eclipse.jetty.servlet.ServletHolder.prepare( > ServletHolder.java:770) > >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( > ServletHandler.java:538) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:143) > >> at org.eclipse.jetty.security.SecurityHandler.handle( > SecurityHandler.java:548) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:190) > >> at org.eclipse.jetty.server.session.SessionHandler. > doHandle(SessionHandler.java:1593) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:188) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doHandle(ContextHandler.java:1239) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:168) > >> at org.eclipse.jetty.servlet.ServletHandler.doScope( > ServletHandler.java:481) > >> at org.eclipse.jetty.server.session.SessionHandler. > doScope(SessionHandler.java:1562) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:166) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doScope(ContextHandler.java:1141) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:141) > >> at org.eclipse.jetty.server.handler.HandlerCollection. > handle(HandlerCollection.java:118) > >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. > handle(GzipHandler.java:561) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.Server.handle(Server.java:564) > >> at org.eclipse.jetty.server.HttpChannel.handle( > HttpChannel.java:320) > >> at org.eclipse.jetty.server.HttpConnection.onFillable( > HttpConnection.java:251) > >> at org.eclipse.jetty.io.AbstractConnection$ > ReadCallback.succeeded(AbstractConnection.java:279) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( > SslConnection.java:258) > >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( > SslConnection.java:147) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( > ChannelEndPoint.java:124) > >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( > Invocable.java:122) > >> at org.eclipse.jetty.util.thread.strategy. > ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. > produceConsume(ExecuteProduceConsume.java:201) > >> at org.eclipse.jetty.util.thread.strategy. > ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( > QueuedThreadPool.java:672) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( > QueuedThreadPool.java:590) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> My configuration inside nifi.properties is as below: > >> # Provenance Repository Properties > >> nifi.provenance.repository.implementation=org.apache.nifi.provenance. > >> WriteAheadProvenanceRepository > >> nifi.provenance.repository.debug.frequency=1_000_000 > >> nifi.provenance.repository.encryption.key.provider.implementation= > >> nifi.provenance.repository.encryption.key.provider.location= > >> nifi.provenance.repository.encryption.key.id= > >> nifi.provenance.repository.encryption.key= > >> > >> # Persistent Provenance Repository Properties > >> nifi.provenance.repository.directory.default=../provenance_repository > >> nifi.provenance.repository.max.storage.time=24 hours > >> nifi.provenance.repository.max.storage.size=1 GB > >> nifi.provenance.repository.rollover.time=30 secs > >> nifi.provenance.repository.rollover.size=100 MB > >> nifi.provenance.repository.query.threads=2 > >> nifi.provenance.repository.index.threads=1 > >> nifi.provenance.repository.compress.on.rollover=true > >> nifi.provenance.repository.always.sync=false > >> nifi.provenance.repository.index.shard.size=4 GB > >> > >> > >> By the way, does this Data Provenance list all FlowFiles ever created or > >> only part of it? Should I try to find the FlowFile with the exception > time > >> in the log? Thanks. > >> > >> Regards, > >> Ben > >> > >> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> > >>> Hi Ben, > >>> > >>> The ExecuteSqlCommand retry logic does not execute the same query > >>> multiple times if it succeeds. > >>> So, there must be input FlowFiles containing the same query had been > >>> passed more than once. > >>> It could be the same FlowFile, or different FlowFiles generated by the > >>> first processor for some reason. > >>> To investigate those kind of FlowFile level information, NiFi > >>> provenance data and FlowFile lineage will be very useful. > >>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html# > >>> viewing-flowfile-lineage > >>> > >>> I didn't mention about it earlier because you were having Provenance > >>> repository performance issue, but I hope you can use it now with the > >>> WriteAheadProvenanceRepository. > >>> > >>> Thanks, > >>> Koji > >>> > >>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote: > >>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to > re-execute > >>> > the sql query if the connection is lost(connection could be > unstable), > >>> my > >>> > idea is to only transfer the FlowFile to the success relationship > >>> > after successfully executing the sql query. You could see the do > while > >>> loop > >>> > in the code, the transaction will be rollbacked if the execution > >>> failed; if > >>> > the connection is lost, it will retry to execute the sql. > >>> > Will this logic cause my sql to be executed twice? > >>> > > >>> > For the WaitBatch processor, I will take your approach to test > >>> individually > >>> > to see if the WaitBatch processor could cause the FlowFile repository > >>> > checkpointing failure. > >>> > > >>> > Regards, > >>> > Ben > >>> > > >>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >>> > > >>> >> Hi Ben, > >>> >> > >>> >> Excuse me, I'm trying, but probably I don't fully understand what > you > >>> >> want to achieve with the flow. > >>> >> > >>> >> It looks weird that WaitBatch is failing with such FlowFile > repository > >>> >> error, while other processor such as ReplaceText succeeds. > >>> >> I recommend to test WaitBatch alone first without combining the > >>> >> database related processors, by feeding a test FlowFile having > >>> >> expected FlowFile attributes. > >>> >> Such input FlowFiles can be created by GenerateFlowFile processor. > >>> >> If the same error happens with only WaitBatch processor, then it > >>> >> should be easier to debug. > >>> >> > >>> >> Thanks, > >>> >> Koji > >>> >> > >>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura < > ijokaruma...@gmail.com > >>> > > >>> >> wrote: > >>> >> > Hi Ben, > >>> >> > > >>> >> > The one thing that looks strange in the screenshot is the > >>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming > connection. > >>> >> > Those should be transferred to 'failure' relationship. > >>> >> > > >>> >> > Following executeSql() method, shouldn't it re-throw the caught > >>> >> exception? > >>> >> > > >>> >> > > >>> >> > try (Connection con = dbcpService.getConnection()) { > >>> >> > logger.debug("设置autoCommit为false"); > >>> >> > con.setAutoCommit(false); > >>> >> > > >>> >> > try (Statement stmt = con.createStatement()) { > >>> >> > logger.info("执行sql语句: {}", new > Object[]{sql}); > >>> >> > stmt.execute(sql); > >>> >> > > >>> >> > // 所有sql语句执行在一个transaction内 > >>> >> > logger.debug("提交transaction"); > >>> >> > con.commit(); > >>> >> > } catch (Exception ex) { > >>> >> > logger.error("执行sql语句失败:{}", new Object[]{sql, > >>> ex}); > >>> >> > con.rollback(); > >>> >> > //将exception抛到外层处理 > >>> >> > throw ex; > >>> >> > } finally { > >>> >> > logger.debug("重新设置autoCommit为true"); > >>> >> > con.setAutoCommit(true); > >>> >> > } > >>> >> > } catch (Exception ex) { > >>> >> > // HERE, the exception is swallowed, that's why the FlowFiles > stay in > >>> >> > the incoming connection. > >>> >> > logger.error("重试执行sql语句:{}", new Object[]{sql, > ex}); > >>> >> > retryOnFail = true; > >>> >> > } > >>> >> > > >>> >> > Thanks, > >>> >> > Koji > >>> >> > > >>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: > >>> >> >> Hi Koji, no problem. You could check the code of processor > >>> WaitBatch at > >>> >> the > >>> >> >> link: > >>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQ > fWQ > >>> >> >> > >>> >> >> I also uploaded a snapshot of part of NiFi flow which includes > the > >>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at > the > >>> >> link: > >>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3 > >>> IVi2h/view > >>> >> >> > >>> >> >> You mentioned above that FlowFile repository fails checkpointing > >>> will > >>> >> >> affect other processors to process same FlowFile again, but as > you > >>> could > >>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second > >>> >> processor > >>> >> >> and before the WaitBatch processor, even if the FlowFile > repository > >>> >> >> checkpointing failure is caused by WaitBatch, could it lead to > the > >>> >> >> processors before it to process a FlowFile multiple times? > Thanks. > >>> >> >> > >>> >> >> Regards, > >>> >> >> Ben > >>> >> >> > >>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com > >: > >>> >> >> > >>> >> >>> Hi Ben, > >>> >> >>> > >>> >> >>> I was referring these two log messages in your previous email. > >>> >> >>> These two messages are both written by ExecuteSqlCommand, it > does > >>> not > >>> >> >>> mean 'it was executed again'. > >>> >> >>> > >>> >> >>> ``` > >>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand > >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >>> 执行sql语句: > >>> >> SELECT > >>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 > FROM > >>> >> >>> dbo.ods_extractDataDebug; > >>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 > drop > >>> >> column > >>> >> >>> _id; > >>> >> >>> > >>> >> >>> and it was executed again later: > >>> >> >>> > >>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand > >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >>> >> >>> 执行sql语句失败:SELECT > >>> >> >>> ``` > >>> >> >>> > >>> >> >>> As you written, the case where FlowFile repository fails > >>> checkpointing > >>> >> >>> will affect other processors to process same FlowFiles again. > >>> However > >>> >> >>> there won't be a simple solution to every processor to rollback > its > >>> >> >>> job as different processors do different things. Creating a temp > >>> table > >>> >> >>> if not exist seems right approach to me. > >>> >> >>> > >>> >> >>> At the same time, the route cause of getting FlowFile repository > >>> >> >>> failed should be investigated. Is it possible to share WaitBatch > >>> code? > >>> >> >>> The reason why ask this is all 'FlowFile Repository failed to > >>> update' > >>> >> >>> is related to WaitBatch processor in the log that you shared > >>> earlier. > >>> >> >>> > >>> >> >>> Thanks, > >>> >> >>> Koji > >>> >> >>> > >>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> > wrote: > >>> >> >>> > Hi Koji, I will print the sql before actually executing it, > but I > >>> >> checked > >>> >> >>> > the error log line you mentioned in your reply, this error was > >>> >> thrown by > >>> >> >>> > NiFi from within another processor called WaitBatch. > >>> >> >>> > I didn't find similar errors as the one from the > >>> ExecuteSqlCommand > >>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is > >>> used to > >>> >> >>> > create temp database tables. > >>> >> >>> > You could check my ExecuteSqlCommand code via the link: > >>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh > >>> jSk_5P > >>> >> >>> > > >>> >> >>> > If the error is really caused by FlowFile repository > checkpoint > >>> >> failure > >>> >> >>> and > >>> >> >>> > the flowfile was executed twice, I may have to create the temp > >>> table > >>> >> only > >>> >> >>> > if doesn't exist, I didn't fix this bug in this way > >>> >> >>> > right away is because I was afraid this fix could cover some > >>> other > >>> >> >>> problems. > >>> >> >>> > > >>> >> >>> > Thanks. > >>> >> >>> > > >>> >> >>> > Regards, > >>> >> >>> > Ben > >>> >> >>> > > >>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura < > ijokaruma...@gmail.com > >>> >: > >>> >> >>> > > >>> >> >>> >> Hi Ben, > >>> >> >>> >> > >>> >> >>> >> The following two log messages are very close in terms of > >>> written > >>> >> >>> >> timestamp, but have different log level. > >>> >> >>> >> 2017-12-26 07:00:01,312 INFO > >>> >> >>> >> 2017-12-26 07:00:01,315 ERROR > >>> >> >>> >> > >>> >> >>> >> I guess those are logged within a single onTrigger of your > >>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, > the > >>> >> other > >>> >> >>> >> is when it caught an exception. Just guessing as I don't have > >>> access > >>> >> >>> >> to the code. > >>> >> >>> >> > >>> >> >>> >> Does the same issue happen with other processors bundled with > >>> Apache > >>> >> >>> >> NiFi without your custom processor running? > >>> >> >>> >> > >>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then > the > >>> >> same > >>> >> >>> >> FlowFile can be processed again after restarting NiFi. > >>> >> >>> >> > >>> >> >>> >> Thanks, > >>> >> >>> >> Koji > >>> >> >>> >> > >>> >> >>> >> > >>> >> >>> >> > >>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> > >>> wrote: > >>> >> >>> >> > Thanks Koji, I will look into this article about the record > >>> model. > >>> >> >>> >> > > >>> >> >>> >> > By the way, that error I previously mentioned to you > occurred > >>> >> again, I > >>> >> >>> >> > could see the sql query was executed twice in the log, this > >>> time > >>> >> I had > >>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as > below: > >>> >> >>> >> > > >>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process > Thread-1] > >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand > >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >>> >> 执行sql语句: > >>> >> >>> >> SELECT > >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_ > 20171226031801926_9195 > >>> FROM > >>> >> >>> >> > dbo.ods_extractDataDebug; > >>> >> >>> >> > alter table tmp.ods_extractDataDebug_ > 20171226031801926_9195 > >>> drop > >>> >> >>> column > >>> >> >>> >> _id; > >>> >> >>> >> > > >>> >> >>> >> > and it was executed again later: > >>> >> >>> >> > > >>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process > Thread-1] > >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand > >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >>> >> >>> >> 执行sql语句失败:SELECT > >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_ > 20171226031801926_9195 > >>> FROM > >>> >> >>> >> > dbo.ods_extractDataDebug; > >>> >> >>> >> > alter table tmp.ods_extractDataDebug_ > 20171226031801926_9195 > >>> drop > >>> >> >>> column > >>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: > >>> 数据库中已存在名为 > >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. > >>> >> >>> makeFromDatabaseError( > >>> >> >>> >> SQLServerException.java:217) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > getNextResul > >>> t( > >>> >> >>> >> SQLServerStatement.java:1655) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > >>> >> doExecuteStatement( > >>> >> >>> >> SQLServerStatement.java:885) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ > >>> >> >>> StmtExecCmd.doExecute( > >>> >> >>> >> SQLServerStatement.java:778) > >>> >> >>> >> > at com.microsoft.sqlserver.jdbc. > TDSCommand.execute(IOBuffer. > >>> >> >>> java:7505) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection. > executeComm > >>> and( > >>> >> >>> >> SQLServerConnection.java:2445) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > executeComma > >>> nd( > >>> >> >>> >> SQLServerStatement.java:191) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > executeState > >>> ment( > >>> >> >>> >> SQLServerStatement.java:166) > >>> >> >>> >> > at > >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( > >>> >> >>> >> SQLServerStatement.java:751) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( > >>> >> >>> >> DelegatingStatement.java:264) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( > >>> >> >>> >> DelegatingStatement.java:264) > >>> >> >>> >> > at > >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > >>> >> >>> >> executeSql(ExecuteSqlCommand.java:194) > >>> >> >>> >> > at > >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > >>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( > >>> >> >>> >> AbstractProcessor.java:27) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode. > onTrigger( > >>> >> >>> >> StandardProcessorNode.java:1119) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > >>> >> call( > >>> >> >>> >> ContinuallyRunProcessorTask.java:147) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.nifi.controller.tasks. > ContinuallyRunProcessorTask. > >>> >> call( > >>> >> >>> >> ContinuallyRunProcessorTask.java:47) > >>> >> >>> >> > at > >>> >> >>> >> > org.apache.nifi.controller.scheduling. > >>> >> TimerDrivenSchedulingAgent$1. > >>> >> >>> run( > >>> >> >>> >> TimerDrivenSchedulingAgent.java:128) > >>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter. > >>> >> >>> >> call(Executors.java:511) > >>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset( > >>> >> FutureTask.java:308) > >>> >> >>> >> > at > >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > >>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. > >>> >> java:180) > >>> >> >>> >> > at > >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > >>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor. > java:294) > >>> >> >>> >> > at > >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( > >>> >> >>> >> ThreadPoolExecutor.java:1142) > >>> >> >>> >> > at > >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( > >>> >> >>> >> ThreadPoolExecutor.java:617) > >>> >> >>> >> > at java.lang.Thread.run(Thread.java:745) > >>> >> >>> >> > > >>> >> >>> >> > I also saw a lot of NiFi's exception like > "ProcessException: > >>> >> FlowFile > >>> >> >>> >> > Repository failed to update", not sure if this is the > reason > >>> the > >>> >> >>> FlowFile > >>> >> >>> >> > got processed twice. Could you help to take a look at my > log > >>> >> file? > >>> >> >>> >> Thanks. > >>> >> >>> >> > You could get the log file via the link: > >>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ > >>> >> >>> N9Xu6zMEi3/view > >>> >> >>> >> > > >>> >> >>> >> > Best Regards, > >>> >> >>> >> > Ben > >>> >> >>> >> > > >>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura < > >>> ijokaruma...@gmail.com > >>> >> >: > >>> >> >>> >> > > >>> >> >>> >> >> Hi Ben, > >>> >> >>> >> >> > >>> >> >>> >> >> This blog post written by Mark, would be a good starting > >>> point > >>> >> to get > >>> >> >>> >> >> familiar with NiFi Record model. > >>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data- > >>> >> with-nifi > >>> >> >>> >> >> > >>> >> >>> >> >> HA for DistributedMapCacheClientService and > >>> >> >>> DistributedMapCacheServer > >>> >> >>> >> >> pair is not supported at the moment. If you need > >>> >> HighAvailability, > >>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis > replication > >>> >> will > >>> >> >>> >> >> provide that, I haven't tried that myself though. > >>> >> >>> >> >> https://redis.io/topics/replication > >>> >> >>> >> >> > >>> >> >>> >> >> Thanks, > >>> >> >>> >> >> Koji > >>> >> >>> >> >> > >>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com > > > >>> >> wrote: > >>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard > and > >>> seen > >>> >> >>> >> anything > >>> >> >>> >> >> > about the NiFi record data model when I was reading the > >>> NiFi > >>> >> >>> >> >> > documentations,could you tell me where this model is > >>> >> documented? > >>> >> >>> >> Thanks. > >>> >> >>> >> >> > > >>> >> >>> >> >> > By the way, to my knowledge, when you need to use the > >>> >> >>> >> >> DistributedMapCacheServer > >>> >> >>> >> >> > from DistributedMapCacheClientService, you need to > >>> specify the > >>> >> >>> host > >>> >> >>> >> url > >>> >> >>> >> >> for > >>> >> >>> >> >> > the server, this means inside a NiFi cluster > >>> >> >>> >> >> > when I specify the cache server and the node suddenly > went > >>> >> down, I > >>> >> >>> >> >> couldn't > >>> >> >>> >> >> > possibly use it until the node goes up again right? Is > >>> there > >>> >> >>> currently > >>> >> >>> >> >> such > >>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks. > >>> >> >>> >> >> > > >>> >> >>> >> >> > Regards, > >>> >> >>> >> >> > Ben > >>> >> >>> >> >> > > >>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura < > >>> >> ijokaruma...@gmail.com>: > >>> >> >>> >> >> > > >>> >> >>> >> >> >> Hi Ben, > >>> >> >>> >> >> >> > >>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is > >>> used > >>> >> to > >>> >> >>> share > >>> >> >>> >> >> >> state among different processors, and it can be used by > >>> your > >>> >> >>> custom > >>> >> >>> >> >> >> processors, too. > >>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies > >>> >> between > >>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow > that > >>> >> requires > >>> >> >>> >> that > >>> >> >>> >> >> >> constraint at least for better performance and > simplicity. > >>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large > >>> amount of > >>> >> >>> data, > >>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, > >>> instead of > >>> >> >>> >> batches > >>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can > use > >>> NiFi > >>> >> >>> Record > >>> >> >>> >> >> >> data model to embed multiple records within a FlowFile, > >>> Record > >>> >> >>> should > >>> >> >>> >> >> >> perform better. > >>> >> >>> >> >> >> > >>> >> >>> >> >> >> Hope this helps. > >>> >> >>> >> >> >> > >>> >> >>> >> >> >> Thanks, > >>> >> >>> >> >> >> Koji > >>> >> >>> >> >> >> > >>> >> >>> >> >> >> > >>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 < > batman...@gmail.com > >>> > > >>> >> wrote: > >>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in > >>> nifi > >>> >> which > >>> >> >>> >> could > >>> >> >>> >> >> >> sync > >>> >> >>> >> >> >> > status between my custom processors. > >>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL > work > >>> >> using > >>> >> >>> nifi > >>> >> >>> >> and > >>> >> >>> >> >> >> I'm > >>> >> >>> >> >> >> > extracting the data from DB into batches of > >>> FlowFiles(each > >>> >> >>> batch of > >>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of > the > >>> >> batch). > >>> >> >>> >> >> >> > There're some groups of custom processors downstream > >>> that > >>> >> need > >>> >> >>> to > >>> >> >>> >> >> process > >>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And > we > >>> >> expect > >>> >> >>> these > >>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a > time. > >>> >> >>> >> >> >> > Therefore we need to implement a custom Wait > >>> processor(let's > >>> >> >>> just > >>> >> >>> >> >> call it > >>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of > >>> FlowFiles > >>> >> while > >>> >> >>> >> the > >>> >> >>> >> >> >> > business processors were handling the batch of > FlowFiles > >>> >> whose > >>> >> >>> >> >> creation > >>> >> >>> >> >> >> > time is earlier. > >>> >> >>> >> >> >> > > >>> >> >>> >> >> >> > In order to implement this, all the WaitBatch > processors > >>> >> placed > >>> >> >>> in > >>> >> >>> >> the > >>> >> >>> >> >> >> flow > >>> >> >>> >> >> >> > need to read/update records in a shared map so that > each > >>> >> set of > >>> >> >>> >> >> >> > business-logic processors process one batch at a > time. > >>> >> >>> >> >> >> > The entries are keyed using the batch number of the > >>> >> FlowFiles > >>> >> >>> and > >>> >> >>> >> the > >>> >> >>> >> >> >> value > >>> >> >>> >> >> >> > of each entry is a batch release counter number which > >>> >> counts the > >>> >> >>> >> >> number > >>> >> >>> >> >> >> of > >>> >> >>> >> >> >> > times the batch of FlowFiles has passed through > >>> >> >>> >> >> >> > a WaitBatch processor. > >>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to > >>> >> increment > >>> >> >>> the > >>> >> >>> >> >> batch > >>> >> >>> >> >> >> > number entry's value by 1 and then the released batch > >>> >> number and > >>> >> >>> >> >> counter > >>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch > with > >>> >> >>> >> StateManager; > >>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will > >>> check if > >>> >> the > >>> >> >>> >> >> counter > >>> >> >>> >> >> >> > value of the previous released batch number in the > >>> shared > >>> >> map is > >>> >> >>> >> >> greater > >>> >> >>> >> >> >> > than the one saved locally, if the entry for the > batch > >>> >> number > >>> >> >>> >> does't > >>> >> >>> >> >> >> > exist(already removed) or the value in the shared > map is > >>> >> >>> greater, > >>> >> >>> >> the > >>> >> >>> >> >> >> next > >>> >> >>> >> >> >> > batch will be released and the local state and the > >>> entry on > >>> >> the > >>> >> >>> >> shared > >>> >> >>> >> >> >> map > >>> >> >>> >> >> >> > will be updated similarly. > >>> >> >>> >> >> >> > In the end of the flow, a custom processor will get > the > >>> >> batch > >>> >> >>> >> number > >>> >> >>> >> >> from > >>> >> >>> >> >> >> > each batch and remove the entry from the shared map . > >>> >> >>> >> >> >> > > >>> >> >>> >> >> >> > So this implementation requires a shared map that > could > >>> >> >>> read/update > >>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify > >>> >> processors > >>> >> >>> in > >>> >> >>> >> >> NIFI > >>> >> >>> >> >> >> and > >>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService > >>> and > >>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm > >>> wondering > >>> >> if I > >>> >> >>> >> could > >>> >> >>> >> >> use > >>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my > >>> >> logic. I > >>> >> >>> also > >>> >> >>> >> >> saw > >>> >> >>> >> >> >> > another implementation called > >>> RedisDistributedMapCacheClient > >>> >> >>> >> Service > >>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis). > >>> Thanks > >>> >> in > >>> >> >>> >> advance > >>> >> >>> >> >> >> for > >>> >> >>> >> >> >> > any suggestions. > >>> >> >>> >> >> >> > > >>> >> >>> >> >> >> > Regards, > >>> >> >>> >> >> >> > Ben > >>> >> >>> >> >> >> > >>> >> >>> >> >> > >>> >> >>> >> > >>> >> >>> > >>> >> > >>> > >> > >> > >