Hi Koji, sorry about the provenance exception, it was because there's no space left on the machine(filled up with logs)
Regards, Ben 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Koji, thanks, the names of the temp tables are created with format > "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second > part is a random number with length of 4. > So I think it's not possible to have 2 duplicate table names, the only > possibly I could think is the flowfile is passed into the processor twice. > > About the provenance, I had updated to use the > WriteAheadProvenanceRepository implementation, but when I tried to check > the data provenance, it showed me the following exception message: > HTTP ERROR 500 > > Problem accessing /nifi/provenance. Reason: > > Server Error > > Caused by: > > javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at > org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) > at > org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > at > org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) > at > org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) > at > org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) > at > org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) > at > org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at > org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) > at > org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) > at > org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) > at > org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > ... 3 more > Caused by: java.lang.NullPointerException > at > org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) > at > org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) > at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) > ... 36 more > > Caused by: > > org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException > at > org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) > at > org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) > at > org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) > at > org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) > at > org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at > org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) > at > org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) > at > org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) > at > org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) > at > org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) > at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) > ... 36 more > > Caused by: > > java.lang.NullPointerException > at > org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) > at > org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) > at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) > at > org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) > at > org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) > at > org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) > at > org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at > org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) > at > org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) > at > org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) > at > org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) > at java.lang.Thread.run(Thread.java:745) > > My configuration inside nifi.properties is as below: > # Provenance Repository Properties > nifi.provenance.repository.implementation=org.apache.nifi.provenance. > WriteAheadProvenanceRepository > nifi.provenance.repository.debug.frequency=1_000_000 > nifi.provenance.repository.encryption.key.provider.implementation= > nifi.provenance.repository.encryption.key.provider.location= > nifi.provenance.repository.encryption.key.id= > nifi.provenance.repository.encryption.key= > > # Persistent Provenance Repository Properties > nifi.provenance.repository.directory.default=../provenance_repository > nifi.provenance.repository.max.storage.time=24 hours > nifi.provenance.repository.max.storage.size=1 GB > nifi.provenance.repository.rollover.time=30 secs > nifi.provenance.repository.rollover.size=100 MB > nifi.provenance.repository.query.threads=2 > nifi.provenance.repository.index.threads=1 > nifi.provenance.repository.compress.on.rollover=true > nifi.provenance.repository.always.sync=false > nifi.provenance.repository.index.shard.size=4 GB > > > By the way, does this Data Provenance list all FlowFiles ever created or > only part of it? Should I try to find the FlowFile with the exception time > in the log? Thanks. > > Regards, > Ben > > 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> Hi Ben, >> >> The ExecuteSqlCommand retry logic does not execute the same query >> multiple times if it succeeds. >> So, there must be input FlowFiles containing the same query had been >> passed more than once. >> It could be the same FlowFile, or different FlowFiles generated by the >> first processor for some reason. >> To investigate those kind of FlowFile level information, NiFi >> provenance data and FlowFile lineage will be very useful. >> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html# >> viewing-flowfile-lineage >> >> I didn't mention about it earlier because you were having Provenance >> repository performance issue, but I hope you can use it now with the >> WriteAheadProvenanceRepository. >> >> Thanks, >> Koji >> >> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote: >> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute >> > the sql query if the connection is lost(connection could be unstable), >> my >> > idea is to only transfer the FlowFile to the success relationship >> > after successfully executing the sql query. You could see the do while >> loop >> > in the code, the transaction will be rollbacked if the execution >> failed; if >> > the connection is lost, it will retry to execute the sql. >> > Will this logic cause my sql to be executed twice? >> > >> > For the WaitBatch processor, I will take your approach to test >> individually >> > to see if the WaitBatch processor could cause the FlowFile repository >> > checkpointing failure. >> > >> > Regards, >> > Ben >> > >> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: >> > >> >> Hi Ben, >> >> >> >> Excuse me, I'm trying, but probably I don't fully understand what you >> >> want to achieve with the flow. >> >> >> >> It looks weird that WaitBatch is failing with such FlowFile repository >> >> error, while other processor such as ReplaceText succeeds. >> >> I recommend to test WaitBatch alone first without combining the >> >> database related processors, by feeding a test FlowFile having >> >> expected FlowFile attributes. >> >> Such input FlowFiles can be created by GenerateFlowFile processor. >> >> If the same error happens with only WaitBatch processor, then it >> >> should be easier to debug. >> >> >> >> Thanks, >> >> Koji >> >> >> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com >> > >> >> wrote: >> >> > Hi Ben, >> >> > >> >> > The one thing that looks strange in the screenshot is the >> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection. >> >> > Those should be transferred to 'failure' relationship. >> >> > >> >> > Following executeSql() method, shouldn't it re-throw the caught >> >> exception? >> >> > >> >> > >> >> > try (Connection con = dbcpService.getConnection()) { >> >> > logger.debug("设置autoCommit为false"); >> >> > con.setAutoCommit(false); >> >> > >> >> > try (Statement stmt = con.createStatement()) { >> >> > logger.info("执行sql语句: {}", new Object[]{sql}); >> >> > stmt.execute(sql); >> >> > >> >> > // 所有sql语句执行在一个transaction内 >> >> > logger.debug("提交transaction"); >> >> > con.commit(); >> >> > } catch (Exception ex) { >> >> > logger.error("执行sql语句失败:{}", new Object[]{sql, >> ex}); >> >> > con.rollback(); >> >> > //将exception抛到外层处理 >> >> > throw ex; >> >> > } finally { >> >> > logger.debug("重新设置autoCommit为true"); >> >> > con.setAutoCommit(true); >> >> > } >> >> > } catch (Exception ex) { >> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in >> >> > the incoming connection. >> >> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex}); >> >> > retryOnFail = true; >> >> > } >> >> > >> >> > Thanks, >> >> > Koji >> >> > >> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: >> >> >> Hi Koji, no problem. You could check the code of processor >> WaitBatch at >> >> the >> >> >> link: >> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ >> >> >> >> >> >> I also uploaded a snapshot of part of NiFi flow which includes the >> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the >> >> link: >> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3 >> IVi2h/view >> >> >> >> >> >> You mentioned above that FlowFile repository fails checkpointing >> will >> >> >> affect other processors to process same FlowFile again, but as you >> could >> >> >> see from my snapshot image, the ExecuteSqlCommand is the second >> >> processor >> >> >> and before the WaitBatch processor, even if the FlowFile repository >> >> >> checkpointing failure is caused by WaitBatch, could it lead to the >> >> >> processors before it to process a FlowFile multiple times? Thanks. >> >> >> >> >> >> Regards, >> >> >> Ben >> >> >> >> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: >> >> >> >> >> >>> Hi Ben, >> >> >>> >> >> >>> I was referring these two log messages in your previous email. >> >> >>> These two messages are both written by ExecuteSqlCommand, it does >> not >> >> >>> mean 'it was executed again'. >> >> >>> >> >> >>> ``` >> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >> >> >>> c.z.nifi.processors.ExecuteSqlCommand >> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> 执行sql语句: >> >> SELECT >> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM >> >> >>> dbo.ods_extractDataDebug; >> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop >> >> column >> >> >>> _id; >> >> >>> >> >> >>> and it was executed again later: >> >> >>> >> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >> >> >>> c.z.nifi.processors.ExecuteSqlCommand >> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >> >>> 执行sql语句失败:SELECT >> >> >>> ``` >> >> >>> >> >> >>> As you written, the case where FlowFile repository fails >> checkpointing >> >> >>> will affect other processors to process same FlowFiles again. >> However >> >> >>> there won't be a simple solution to every processor to rollback its >> >> >>> job as different processors do different things. Creating a temp >> table >> >> >>> if not exist seems right approach to me. >> >> >>> >> >> >>> At the same time, the route cause of getting FlowFile repository >> >> >>> failed should be investigated. Is it possible to share WaitBatch >> code? >> >> >>> The reason why ask this is all 'FlowFile Repository failed to >> update' >> >> >>> is related to WaitBatch processor in the log that you shared >> earlier. >> >> >>> >> >> >>> Thanks, >> >> >>> Koji >> >> >>> >> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote: >> >> >>> > Hi Koji, I will print the sql before actually executing it, but I >> >> checked >> >> >>> > the error log line you mentioned in your reply, this error was >> >> thrown by >> >> >>> > NiFi from within another processor called WaitBatch. >> >> >>> > I didn't find similar errors as the one from the >> ExecuteSqlCommand >> >> >>> > processor, I think it's because only the ExecuteSqlCommand is >> used to >> >> >>> > create temp database tables. >> >> >>> > You could check my ExecuteSqlCommand code via the link: >> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh >> jSk_5P >> >> >>> > >> >> >>> > If the error is really caused by FlowFile repository checkpoint >> >> failure >> >> >>> and >> >> >>> > the flowfile was executed twice, I may have to create the temp >> table >> >> only >> >> >>> > if doesn't exist, I didn't fix this bug in this way >> >> >>> > right away is because I was afraid this fix could cover some >> other >> >> >>> problems. >> >> >>> > >> >> >>> > Thanks. >> >> >>> > >> >> >>> > Regards, >> >> >>> > Ben >> >> >>> > >> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com >> >: >> >> >>> > >> >> >>> >> Hi Ben, >> >> >>> >> >> >> >>> >> The following two log messages are very close in terms of >> written >> >> >>> >> timestamp, but have different log level. >> >> >>> >> 2017-12-26 07:00:01,312 INFO >> >> >>> >> 2017-12-26 07:00:01,315 ERROR >> >> >>> >> >> >> >>> >> I guess those are logged within a single onTrigger of your >> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the >> >> other >> >> >>> >> is when it caught an exception. Just guessing as I don't have >> access >> >> >>> >> to the code. >> >> >>> >> >> >> >>> >> Does the same issue happen with other processors bundled with >> Apache >> >> >>> >> NiFi without your custom processor running? >> >> >>> >> >> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the >> >> same >> >> >>> >> FlowFile can be processed again after restarting NiFi. >> >> >>> >> >> >> >>> >> Thanks, >> >> >>> >> Koji >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> >> wrote: >> >> >>> >> > Thanks Koji, I will look into this article about the record >> model. >> >> >>> >> > >> >> >>> >> > By the way, that error I previously mentioned to you occurred >> >> again, I >> >> >>> >> > could see the sql query was executed twice in the log, this >> time >> >> I had >> >> >>> >> > turned on the verbose NiFi logging, the sql query is as below: >> >> >>> >> > >> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >> 执行sql语句: >> >> >>> >> SELECT >> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 >> FROM >> >> >>> >> > dbo.ods_extractDataDebug; >> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 >> drop >> >> >>> column >> >> >>> >> _id; >> >> >>> >> > >> >> >>> >> > and it was executed again later: >> >> >>> >> > >> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >> >>> >> 执行sql语句失败:SELECT >> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 >> FROM >> >> >>> >> > dbo.ods_extractDataDebug; >> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 >> drop >> >> >>> column >> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: >> 数据库中已存在名为 >> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 >> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. >> >> >>> makeFromDatabaseError( >> >> >>> >> SQLServerException.java:217) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResul >> t( >> >> >>> >> SQLServerStatement.java:1655) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >> >> doExecuteStatement( >> >> >>> >> SQLServerStatement.java:885) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ >> >> >>> StmtExecCmd.doExecute( >> >> >>> >> SQLServerStatement.java:778) >> >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer. >> >> >>> java:7505) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeComm >> and( >> >> >>> >> SQLServerConnection.java:2445) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeComma >> nd( >> >> >>> >> SQLServerStatement.java:191) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeState >> ment( >> >> >>> >> SQLServerStatement.java:166) >> >> >>> >> > at >> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( >> >> >>> >> SQLServerStatement.java:751) >> >> >>> >> > at >> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >> >> >>> >> DelegatingStatement.java:264) >> >> >>> >> > at >> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >> >> >>> >> DelegatingStatement.java:264) >> >> >>> >> > at >> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >> >> >>> >> executeSql(ExecuteSqlCommand.java:194) >> >> >>> >> > at >> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >> >> >>> >> onTrigger(ExecuteSqlCommand.java:164) >> >> >>> >> > at >> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( >> >> >>> >> AbstractProcessor.java:27) >> >> >>> >> > at >> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( >> >> >>> >> StandardProcessorNode.java:1119) >> >> >>> >> > at >> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >> >> call( >> >> >>> >> ContinuallyRunProcessorTask.java:147) >> >> >>> >> > at >> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >> >> call( >> >> >>> >> ContinuallyRunProcessorTask.java:47) >> >> >>> >> > at >> >> >>> >> > org.apache.nifi.controller.scheduling. >> >> TimerDrivenSchedulingAgent$1. >> >> >>> run( >> >> >>> >> TimerDrivenSchedulingAgent.java:128) >> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter. >> >> >>> >> call(Executors.java:511) >> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset( >> >> FutureTask.java:308) >> >> >>> >> > at >> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. >> >> java:180) >> >> >>> >> > at >> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> >> >>> >> > at >> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( >> >> >>> >> ThreadPoolExecutor.java:1142) >> >> >>> >> > at >> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( >> >> >>> >> ThreadPoolExecutor.java:617) >> >> >>> >> > at java.lang.Thread.run(Thread.java:745) >> >> >>> >> > >> >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException: >> >> FlowFile >> >> >>> >> > Repository failed to update", not sure if this is the reason >> the >> >> >>> FlowFile >> >> >>> >> > got processed twice. Could you help to take a look at my log >> >> file? >> >> >>> >> Thanks. >> >> >>> >> > You could get the log file via the link: >> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ >> >> >>> N9Xu6zMEi3/view >> >> >>> >> > >> >> >>> >> > Best Regards, >> >> >>> >> > Ben >> >> >>> >> > >> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura < >> ijokaruma...@gmail.com >> >> >: >> >> >>> >> > >> >> >>> >> >> Hi Ben, >> >> >>> >> >> >> >> >>> >> >> This blog post written by Mark, would be a good starting >> point >> >> to get >> >> >>> >> >> familiar with NiFi Record model. >> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data- >> >> with-nifi >> >> >>> >> >> >> >> >>> >> >> HA for DistributedMapCacheClientService and >> >> >>> DistributedMapCacheServer >> >> >>> >> >> pair is not supported at the moment. If you need >> >> HighAvailability, >> >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication >> >> will >> >> >>> >> >> provide that, I haven't tried that myself though. >> >> >>> >> >> https://redis.io/topics/replication >> >> >>> >> >> >> >> >>> >> >> Thanks, >> >> >>> >> >> Koji >> >> >>> >> >> >> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> >> >> wrote: >> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and >> seen >> >> >>> >> anything >> >> >>> >> >> > about the NiFi record data model when I was reading the >> NiFi >> >> >>> >> >> > documentations,could you tell me where this model is >> >> documented? >> >> >>> >> Thanks. >> >> >>> >> >> > >> >> >>> >> >> > By the way, to my knowledge, when you need to use the >> >> >>> >> >> DistributedMapCacheServer >> >> >>> >> >> > from DistributedMapCacheClientService, you need to >> specify the >> >> >>> host >> >> >>> >> url >> >> >>> >> >> for >> >> >>> >> >> > the server, this means inside a NiFi cluster >> >> >>> >> >> > when I specify the cache server and the node suddenly went >> >> down, I >> >> >>> >> >> couldn't >> >> >>> >> >> > possibly use it until the node goes up again right? Is >> there >> >> >>> currently >> >> >>> >> >> such >> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks. >> >> >>> >> >> > >> >> >>> >> >> > Regards, >> >> >>> >> >> > Ben >> >> >>> >> >> > >> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura < >> >> ijokaruma...@gmail.com>: >> >> >>> >> >> > >> >> >>> >> >> >> Hi Ben, >> >> >>> >> >> >> >> >> >>> >> >> >> As you found from existing code, DistributedMapCache is >> used >> >> to >> >> >>> share >> >> >>> >> >> >> state among different processors, and it can be used by >> your >> >> >>> custom >> >> >>> >> >> >> processors, too. >> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies >> >> between >> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that >> >> requires >> >> >>> >> that >> >> >>> >> >> >> constraint at least for better performance and simplicity. >> >> >>> >> >> >> For example, since a FlowFile can hold fairly large >> amount of >> >> >>> data, >> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, >> instead of >> >> >>> >> batches >> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use >> NiFi >> >> >>> Record >> >> >>> >> >> >> data model to embed multiple records within a FlowFile, >> Record >> >> >>> should >> >> >>> >> >> >> perform better. >> >> >>> >> >> >> >> >> >>> >> >> >> Hope this helps. >> >> >>> >> >> >> >> >> >>> >> >> >> Thanks, >> >> >>> >> >> >> Koji >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com >> > >> >> wrote: >> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in >> nifi >> >> which >> >> >>> >> could >> >> >>> >> >> >> sync >> >> >>> >> >> >> > status between my custom processors. >> >> >>> >> >> >> > our requirement is like this, we're doing some ETL work >> >> using >> >> >>> nifi >> >> >>> >> and >> >> >>> >> >> >> I'm >> >> >>> >> >> >> > extracting the data from DB into batches of >> FlowFiles(each >> >> >>> batch of >> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the >> >> batch). >> >> >>> >> >> >> > There're some groups of custom processors downstream >> that >> >> need >> >> >>> to >> >> >>> >> >> process >> >> >>> >> >> >> > these FlowFiles to do some business logic work. And we >> >> expect >> >> >>> these >> >> >>> >> >> >> > processors to process one batch of FlowFiles at a time. >> >> >>> >> >> >> > Therefore we need to implement a custom Wait >> processor(let's >> >> >>> just >> >> >>> >> >> call it >> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of >> FlowFiles >> >> while >> >> >>> >> the >> >> >>> >> >> >> > business processors were handling the batch of FlowFiles >> >> whose >> >> >>> >> >> creation >> >> >>> >> >> >> > time is earlier. >> >> >>> >> >> >> > >> >> >>> >> >> >> > In order to implement this, all the WaitBatch processors >> >> placed >> >> >>> in >> >> >>> >> the >> >> >>> >> >> >> flow >> >> >>> >> >> >> > need to read/update records in a shared map so that each >> >> set of >> >> >>> >> >> >> > business-logic processors process one batch at a time. >> >> >>> >> >> >> > The entries are keyed using the batch number of the >> >> FlowFiles >> >> >>> and >> >> >>> >> the >> >> >>> >> >> >> value >> >> >>> >> >> >> > of each entry is a batch release counter number which >> >> counts the >> >> >>> >> >> number >> >> >>> >> >> >> of >> >> >>> >> >> >> > times the batch of FlowFiles has passed through >> >> >>> >> >> >> > a WaitBatch processor. >> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to >> >> increment >> >> >>> the >> >> >>> >> >> batch >> >> >>> >> >> >> > number entry's value by 1 and then the released batch >> >> number and >> >> >>> >> >> counter >> >> >>> >> >> >> > number will also be saved locally at the WaitBatch with >> >> >>> >> StateManager; >> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will >> check if >> >> the >> >> >>> >> >> counter >> >> >>> >> >> >> > value of the previous released batch number in the >> shared >> >> map is >> >> >>> >> >> greater >> >> >>> >> >> >> > than the one saved locally, if the entry for the batch >> >> number >> >> >>> >> does't >> >> >>> >> >> >> > exist(already removed) or the value in the shared map is >> >> >>> greater, >> >> >>> >> the >> >> >>> >> >> >> next >> >> >>> >> >> >> > batch will be released and the local state and the >> entry on >> >> the >> >> >>> >> shared >> >> >>> >> >> >> map >> >> >>> >> >> >> > will be updated similarly. >> >> >>> >> >> >> > In the end of the flow, a custom processor will get the >> >> batch >> >> >>> >> number >> >> >>> >> >> from >> >> >>> >> >> >> > each batch and remove the entry from the shared map . >> >> >>> >> >> >> > >> >> >>> >> >> >> > So this implementation requires a shared map that could >> >> >>> read/update >> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify >> >> processors >> >> >>> in >> >> >>> >> >> NIFI >> >> >>> >> >> >> and >> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService >> and >> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm >> wondering >> >> if I >> >> >>> >> could >> >> >>> >> >> use >> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my >> >> logic. I >> >> >>> also >> >> >>> >> >> saw >> >> >>> >> >> >> > another implementation called >> RedisDistributedMapCacheClient >> >> >>> >> Service >> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis). >> Thanks >> >> in >> >> >>> >> advance >> >> >>> >> >> >> for >> >> >>> >> >> >> > any suggestions. >> >> >>> >> >> >> > >> >> >>> >> >> >> > Regards, >> >> >>> >> >> >> > Ben >> >> >>> >> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> > >