Hi Koji, thanks, the names of the temp tables are created with format "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second part is a random number with length of 4. So I think it's not possible to have 2 duplicate table names, the only possibly I could think is the flowfile is passed into the processor twice.
About the provenance, I had updated to use the WriteAheadProvenanceRepository implementation, but when I tried to check the data provenance, it showed me the following exception message: HTTP ERROR 500 Problem accessing /nifi/provenance. Reason: Server Error Caused by: javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138) at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:564) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) at java.lang.Thread.run(Thread.java:745) Caused by: org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) at org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) at org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:564) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) at org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) ... 3 more Caused by: java.lang.NullPointerException at org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) ... 36 more Caused by: org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) at org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) at org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:564) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) at org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) ... 36 more Caused by: java.lang.NullPointerException at org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) at org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) at org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:564) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) at org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) at java.lang.Thread.run(Thread.java:745) My configuration inside nifi.properties is as below: # Provenance Repository Properties nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository nifi.provenance.repository.debug.frequency=1_000_000 nifi.provenance.repository.encryption.key.provider.implementation= nifi.provenance.repository.encryption.key.provider.location= nifi.provenance.repository.encryption.key.id= nifi.provenance.repository.encryption.key= # Persistent Provenance Repository Properties nifi.provenance.repository.directory.default=../provenance_repository nifi.provenance.repository.max.storage.time=24 hours nifi.provenance.repository.max.storage.size=1 GB nifi.provenance.repository.rollover.time=30 secs nifi.provenance.repository.rollover.size=100 MB nifi.provenance.repository.query.threads=2 nifi.provenance.repository.index.threads=1 nifi.provenance.repository.compress.on.rollover=true nifi.provenance.repository.always.sync=false nifi.provenance.repository.index.shard.size=4 GB By the way, does this Data Provenance list all FlowFiles ever created or only part of it? Should I try to find the FlowFile with the exception time in the log? Thanks. Regards, Ben 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > The ExecuteSqlCommand retry logic does not execute the same query > multiple times if it succeeds. > So, there must be input FlowFiles containing the same query had been > passed more than once. > It could be the same FlowFile, or different FlowFiles generated by the > first processor for some reason. > To investigate those kind of FlowFile level information, NiFi > provenance data and FlowFile lineage will be very useful. > https://nifi.apache.org/docs/nifi-docs/html/user-guide. > html#viewing-flowfile-lineage > > I didn't mention about it earlier because you were having Provenance > repository performance issue, but I hope you can use it now with the > WriteAheadProvenanceRepository. > > Thanks, > Koji > > On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute > > the sql query if the connection is lost(connection could be unstable), my > > idea is to only transfer the FlowFile to the success relationship > > after successfully executing the sql query. You could see the do while > loop > > in the code, the transaction will be rollbacked if the execution failed; > if > > the connection is lost, it will retry to execute the sql. > > Will this logic cause my sql to be executed twice? > > > > For the WaitBatch processor, I will take your approach to test > individually > > to see if the WaitBatch processor could cause the FlowFile repository > > checkpointing failure. > > > > Regards, > > Ben > > > > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > > >> Hi Ben, > >> > >> Excuse me, I'm trying, but probably I don't fully understand what you > >> want to achieve with the flow. > >> > >> It looks weird that WaitBatch is failing with such FlowFile repository > >> error, while other processor such as ReplaceText succeeds. > >> I recommend to test WaitBatch alone first without combining the > >> database related processors, by feeding a test FlowFile having > >> expected FlowFile attributes. > >> Such input FlowFiles can be created by GenerateFlowFile processor. > >> If the same error happens with only WaitBatch processor, then it > >> should be easier to debug. > >> > >> Thanks, > >> Koji > >> > >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com> > >> wrote: > >> > Hi Ben, > >> > > >> > The one thing that looks strange in the screenshot is the > >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection. > >> > Those should be transferred to 'failure' relationship. > >> > > >> > Following executeSql() method, shouldn't it re-throw the caught > >> exception? > >> > > >> > > >> > try (Connection con = dbcpService.getConnection()) { > >> > logger.debug("设置autoCommit为false"); > >> > con.setAutoCommit(false); > >> > > >> > try (Statement stmt = con.createStatement()) { > >> > logger.info("执行sql语句: {}", new Object[]{sql}); > >> > stmt.execute(sql); > >> > > >> > // 所有sql语句执行在一个transaction内 > >> > logger.debug("提交transaction"); > >> > con.commit(); > >> > } catch (Exception ex) { > >> > logger.error("执行sql语句失败:{}", new Object[]{sql, > ex}); > >> > con.rollback(); > >> > //将exception抛到外层处理 > >> > throw ex; > >> > } finally { > >> > logger.debug("重新设置autoCommit为true"); > >> > con.setAutoCommit(true); > >> > } > >> > } catch (Exception ex) { > >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in > >> > the incoming connection. > >> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex}); > >> > retryOnFail = true; > >> > } > >> > > >> > Thanks, > >> > Koji > >> > > >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: > >> >> Hi Koji, no problem. You could check the code of processor WaitBatch > at > >> the > >> >> link: > >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ > >> >> > >> >> I also uploaded a snapshot of part of NiFi flow which includes the > >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the > >> link: > >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o- > 3IVi2h/view > >> >> > >> >> You mentioned above that FlowFile repository fails checkpointing will > >> >> affect other processors to process same FlowFile again, but as you > could > >> >> see from my snapshot image, the ExecuteSqlCommand is the second > >> processor > >> >> and before the WaitBatch processor, even if the FlowFile repository > >> >> checkpointing failure is caused by WaitBatch, could it lead to the > >> >> processors before it to process a FlowFile multiple times? Thanks. > >> >> > >> >> Regards, > >> >> Ben > >> >> > >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> >> > >> >>> Hi Ben, > >> >>> > >> >>> I was referring these two log messages in your previous email. > >> >>> These two messages are both written by ExecuteSqlCommand, it does > not > >> >>> mean 'it was executed again'. > >> >>> > >> >>> ``` > >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > >> >>> c.z.nifi.processors.ExecuteSqlCommand > >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: > >> SELECT > >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > >> >>> dbo.ods_extractDataDebug; > >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop > >> column > >> >>> _id; > >> >>> > >> >>> and it was executed again later: > >> >>> > >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > >> >>> c.z.nifi.processors.ExecuteSqlCommand > >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >> >>> 执行sql语句失败:SELECT > >> >>> ``` > >> >>> > >> >>> As you written, the case where FlowFile repository fails > checkpointing > >> >>> will affect other processors to process same FlowFiles again. > However > >> >>> there won't be a simple solution to every processor to rollback its > >> >>> job as different processors do different things. Creating a temp > table > >> >>> if not exist seems right approach to me. > >> >>> > >> >>> At the same time, the route cause of getting FlowFile repository > >> >>> failed should be investigated. Is it possible to share WaitBatch > code? > >> >>> The reason why ask this is all 'FlowFile Repository failed to > update' > >> >>> is related to WaitBatch processor in the log that you shared > earlier. > >> >>> > >> >>> Thanks, > >> >>> Koji > >> >>> > >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote: > >> >>> > Hi Koji, I will print the sql before actually executing it, but I > >> checked > >> >>> > the error log line you mentioned in your reply, this error was > >> thrown by > >> >>> > NiFi from within another processor called WaitBatch. > >> >>> > I didn't find similar errors as the one from the ExecuteSqlCommand > >> >>> > processor, I think it's because only the ExecuteSqlCommand is > used to > >> >>> > create temp database tables. > >> >>> > You could check my ExecuteSqlCommand code via the link: > >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk > _5P > >> >>> > > >> >>> > If the error is really caused by FlowFile repository checkpoint > >> failure > >> >>> and > >> >>> > the flowfile was executed twice, I may have to create the temp > table > >> only > >> >>> > if doesn't exist, I didn't fix this bug in this way > >> >>> > right away is because I was afraid this fix could cover some other > >> >>> problems. > >> >>> > > >> >>> > Thanks. > >> >>> > > >> >>> > Regards, > >> >>> > Ben > >> >>> > > >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com > >: > >> >>> > > >> >>> >> Hi Ben, > >> >>> >> > >> >>> >> The following two log messages are very close in terms of written > >> >>> >> timestamp, but have different log level. > >> >>> >> 2017-12-26 07:00:01,312 INFO > >> >>> >> 2017-12-26 07:00:01,315 ERROR > >> >>> >> > >> >>> >> I guess those are logged within a single onTrigger of your > >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the > >> other > >> >>> >> is when it caught an exception. Just guessing as I don't have > access > >> >>> >> to the code. > >> >>> >> > >> >>> >> Does the same issue happen with other processors bundled with > Apache > >> >>> >> NiFi without your custom processor running? > >> >>> >> > >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the > >> same > >> >>> >> FlowFile can be processed again after restarting NiFi. > >> >>> >> > >> >>> >> Thanks, > >> >>> >> Koji > >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> > wrote: > >> >>> >> > Thanks Koji, I will look into this article about the record > model. > >> >>> >> > > >> >>> >> > By the way, that error I previously mentioned to you occurred > >> again, I > >> >>> >> > could see the sql query was executed twice in the log, this > time > >> I had > >> >>> >> > turned on the verbose NiFi logging, the sql query is as below: > >> >>> >> > > >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand > >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >> 执行sql语句: > >> >>> >> SELECT > >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 > FROM > >> >>> >> > dbo.ods_extractDataDebug; > >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 > drop > >> >>> column > >> >>> >> _id; > >> >>> >> > > >> >>> >> > and it was executed again later: > >> >>> >> > > >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand > >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > >> >>> >> 执行sql语句失败:SELECT > >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 > FROM > >> >>> >> > dbo.ods_extractDataDebug; > >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 > drop > >> >>> column > >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: > 数据库中已存在名为 > >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. > >> >>> makeFromDatabaseError( > >> >>> >> SQLServerException.java:217) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult( > >> >>> >> SQLServerStatement.java:1655) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > >> doExecuteStatement( > >> >>> >> SQLServerStatement.java:885) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ > >> >>> StmtExecCmd.doExecute( > >> >>> >> SQLServerStatement.java:778) > >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer. > >> >>> java:7505) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection. > executeCommand( > >> >>> >> SQLServerConnection.java:2445) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > executeCommand( > >> >>> >> SQLServerStatement.java:191) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. > executeStatement( > >> >>> >> SQLServerStatement.java:166) > >> >>> >> > at > >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( > >> >>> >> SQLServerStatement.java:751) > >> >>> >> > at > >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( > >> >>> >> DelegatingStatement.java:264) > >> >>> >> > at > >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( > >> >>> >> DelegatingStatement.java:264) > >> >>> >> > at > >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > >> >>> >> executeSql(ExecuteSqlCommand.java:194) > >> >>> >> > at > >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > >> >>> >> onTrigger(ExecuteSqlCommand.java:164) > >> >>> >> > at > >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( > >> >>> >> AbstractProcessor.java:27) > >> >>> >> > at > >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > >> >>> >> StandardProcessorNode.java:1119) > >> >>> >> > at > >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > >> call( > >> >>> >> ContinuallyRunProcessorTask.java:147) > >> >>> >> > at > >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > >> call( > >> >>> >> ContinuallyRunProcessorTask.java:47) > >> >>> >> > at > >> >>> >> > org.apache.nifi.controller.scheduling. > >> TimerDrivenSchedulingAgent$1. > >> >>> run( > >> >>> >> TimerDrivenSchedulingAgent.java:128) > >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter. > >> >>> >> call(Executors.java:511) > >> >>> >> > at java.util.concurrent.FutureTask.runAndReset( > >> FutureTask.java:308) > >> >>> >> > at > >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. > >> java:180) > >> >>> >> > at > >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > >> >>> >> > at > >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( > >> >>> >> ThreadPoolExecutor.java:1142) > >> >>> >> > at > >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( > >> >>> >> ThreadPoolExecutor.java:617) > >> >>> >> > at java.lang.Thread.run(Thread.java:745) > >> >>> >> > > >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException: > >> FlowFile > >> >>> >> > Repository failed to update", not sure if this is the reason > the > >> >>> FlowFile > >> >>> >> > got processed twice. Could you help to take a look at my log > >> file? > >> >>> >> Thanks. > >> >>> >> > You could get the log file via the link: > >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ > >> >>> N9Xu6zMEi3/view > >> >>> >> > > >> >>> >> > Best Regards, > >> >>> >> > Ben > >> >>> >> > > >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura < > ijokaruma...@gmail.com > >> >: > >> >>> >> > > >> >>> >> >> Hi Ben, > >> >>> >> >> > >> >>> >> >> This blog post written by Mark, would be a good starting point > >> to get > >> >>> >> >> familiar with NiFi Record model. > >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data- > >> with-nifi > >> >>> >> >> > >> >>> >> >> HA for DistributedMapCacheClientService and > >> >>> DistributedMapCacheServer > >> >>> >> >> pair is not supported at the moment. If you need > >> HighAvailability, > >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication > >> will > >> >>> >> >> provide that, I haven't tried that myself though. > >> >>> >> >> https://redis.io/topics/replication > >> >>> >> >> > >> >>> >> >> Thanks, > >> >>> >> >> Koji > >> >>> >> >> > >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> > >> wrote: > >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and > seen > >> >>> >> anything > >> >>> >> >> > about the NiFi record data model when I was reading the NiFi > >> >>> >> >> > documentations,could you tell me where this model is > >> documented? > >> >>> >> Thanks. > >> >>> >> >> > > >> >>> >> >> > By the way, to my knowledge, when you need to use the > >> >>> >> >> DistributedMapCacheServer > >> >>> >> >> > from DistributedMapCacheClientService, you need to specify > the > >> >>> host > >> >>> >> url > >> >>> >> >> for > >> >>> >> >> > the server, this means inside a NiFi cluster > >> >>> >> >> > when I specify the cache server and the node suddenly went > >> down, I > >> >>> >> >> couldn't > >> >>> >> >> > possibly use it until the node goes up again right? Is there > >> >>> currently > >> >>> >> >> such > >> >>> >> >> > a cache server in NiFi that could support HA? Thanks. > >> >>> >> >> > > >> >>> >> >> > Regards, > >> >>> >> >> > Ben > >> >>> >> >> > > >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura < > >> ijokaruma...@gmail.com>: > >> >>> >> >> > > >> >>> >> >> >> Hi Ben, > >> >>> >> >> >> > >> >>> >> >> >> As you found from existing code, DistributedMapCache is > used > >> to > >> >>> share > >> >>> >> >> >> state among different processors, and it can be used by > your > >> >>> custom > >> >>> >> >> >> processors, too. > >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies > >> between > >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that > >> requires > >> >>> >> that > >> >>> >> >> >> constraint at least for better performance and simplicity. > >> >>> >> >> >> For example, since a FlowFile can hold fairly large amount > of > >> >>> data, > >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, > instead of > >> >>> >> batches > >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use > NiFi > >> >>> Record > >> >>> >> >> >> data model to embed multiple records within a FlowFile, > Record > >> >>> should > >> >>> >> >> >> perform better. > >> >>> >> >> >> > >> >>> >> >> >> Hope this helps. > >> >>> >> >> >> > >> >>> >> >> >> Thanks, > >> >>> >> >> >> Koji > >> >>> >> >> >> > >> >>> >> >> >> > >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> > >> wrote: > >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in > nifi > >> which > >> >>> >> could > >> >>> >> >> >> sync > >> >>> >> >> >> > status between my custom processors. > >> >>> >> >> >> > our requirement is like this, we're doing some ETL work > >> using > >> >>> nifi > >> >>> >> and > >> >>> >> >> >> I'm > >> >>> >> >> >> > extracting the data from DB into batches of > FlowFiles(each > >> >>> batch of > >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the > >> batch). > >> >>> >> >> >> > There're some groups of custom processors downstream that > >> need > >> >>> to > >> >>> >> >> process > >> >>> >> >> >> > these FlowFiles to do some business logic work. And we > >> expect > >> >>> these > >> >>> >> >> >> > processors to process one batch of FlowFiles at a time. > >> >>> >> >> >> > Therefore we need to implement a custom Wait > processor(let's > >> >>> just > >> >>> >> >> call it > >> >>> >> >> >> > WaitBatch here) to hold all the other batches of > FlowFiles > >> while > >> >>> >> the > >> >>> >> >> >> > business processors were handling the batch of FlowFiles > >> whose > >> >>> >> >> creation > >> >>> >> >> >> > time is earlier. > >> >>> >> >> >> > > >> >>> >> >> >> > In order to implement this, all the WaitBatch processors > >> placed > >> >>> in > >> >>> >> the > >> >>> >> >> >> flow > >> >>> >> >> >> > need to read/update records in a shared map so that each > >> set of > >> >>> >> >> >> > business-logic processors process one batch at a time. > >> >>> >> >> >> > The entries are keyed using the batch number of the > >> FlowFiles > >> >>> and > >> >>> >> the > >> >>> >> >> >> value > >> >>> >> >> >> > of each entry is a batch release counter number which > >> counts the > >> >>> >> >> number > >> >>> >> >> >> of > >> >>> >> >> >> > times the batch of FlowFiles has passed through > >> >>> >> >> >> > a WaitBatch processor. > >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to > >> increment > >> >>> the > >> >>> >> >> batch > >> >>> >> >> >> > number entry's value by 1 and then the released batch > >> number and > >> >>> >> >> counter > >> >>> >> >> >> > number will also be saved locally at the WaitBatch with > >> >>> >> StateManager; > >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will check > if > >> the > >> >>> >> >> counter > >> >>> >> >> >> > value of the previous released batch number in the shared > >> map is > >> >>> >> >> greater > >> >>> >> >> >> > than the one saved locally, if the entry for the batch > >> number > >> >>> >> does't > >> >>> >> >> >> > exist(already removed) or the value in the shared map is > >> >>> greater, > >> >>> >> the > >> >>> >> >> >> next > >> >>> >> >> >> > batch will be released and the local state and the entry > on > >> the > >> >>> >> shared > >> >>> >> >> >> map > >> >>> >> >> >> > will be updated similarly. > >> >>> >> >> >> > In the end of the flow, a custom processor will get the > >> batch > >> >>> >> number > >> >>> >> >> from > >> >>> >> >> >> > each batch and remove the entry from the shared map . > >> >>> >> >> >> > > >> >>> >> >> >> > So this implementation requires a shared map that could > >> >>> read/update > >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify > >> processors > >> >>> in > >> >>> >> >> NIFI > >> >>> >> >> >> and > >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService and > >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm > wondering > >> if I > >> >>> >> could > >> >>> >> >> use > >> >>> >> >> >> > the DistributedMapCacheClientService to implement my > >> logic. I > >> >>> also > >> >>> >> >> saw > >> >>> >> >> >> > another implementation called > RedisDistributedMapCacheClient > >> >>> >> Service > >> >>> >> >> >> > which seems to require Redis(I haven't used Redis). > Thanks > >> in > >> >>> >> advance > >> >>> >> >> >> for > >> >>> >> >> >> > any suggestions. > >> >>> >> >> >> > > >> >>> >> >> >> > Regards, > >> >>> >> >> >> > Ben > >> >>> >> >> >> > >> >>> >> >> > >> >>> >> > >> >>> > >> >