Hi Ben, you can filter events by timestamp as well. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#searching-for-events
On Wed, Dec 27, 2017 at 6:28 PM, 尹文才 <batman...@gmail.com> wrote: > Hi Koji, I saw it was only showing the 1000 events so I couldn't see the > event when the FlowFile was created. > > Regards, > Ben > > 2017-12-27 17:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> I see, thanks. The easiest way to look at provenance events would be >> by right clicking a processor instance you are interested in, then >> select 'View data provenance' context menu. This way, NiFi displays >> provenance events for the selected processor. >> >> Koji >> >> On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <batman...@gmail.com> wrote: >> > Hi Koji, sorry about the provenance exception, it was because there's no >> > space left on the machine(filled up with logs) >> > >> > Regards, >> > Ben >> > >> > 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>: >> > >> >> Hi Koji, thanks, the names of the temp tables are created with format >> >> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the >> second >> >> part is a random number with length of 4. >> >> So I think it's not possible to have 2 duplicate table names, the only >> >> possibly I could think is the flowfile is passed into the processor >> twice. >> >> >> >> About the provenance, I had updated to use the >> >> WriteAheadProvenanceRepository implementation, but when I tried to check >> >> the data provenance, it showed me the following exception message: >> >> HTTP ERROR 500 >> >> >> >> Problem accessing /nifi/provenance. Reason: >> >> >> >> Server Error >> >> >> >> Caused by: >> >> >> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: >> java.lang.NullPointerException >> >> at org.eclipse.jetty.server.handler.HandlerCollection. >> handle(HandlerCollection.java:138) >> >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. >> handle(GzipHandler.java:561) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> >> at org.eclipse.jetty.server.HttpChannel.handle( >> HttpChannel.java:320) >> >> at org.eclipse.jetty.server.HttpConnection.onFillable( >> HttpConnection.java:251) >> >> at org.eclipse.jetty.io.AbstractConnection$ >> ReadCallback.succeeded(AbstractConnection.java:279) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( >> SslConnection.java:258) >> >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( >> SslConnection.java:147) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( >> ChannelEndPoint.java:124) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( >> QueuedThreadPool.java:672) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( >> QueuedThreadPool.java:590) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: >> java.lang.NullPointerException >> >> at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable( >> ServletHolder.java:596) >> >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( >> ServletHolder.java:655) >> >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( >> ServletHolder.java:498) >> >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( >> ServletHolder.java:785) >> >> at org.eclipse.jetty.servlet.ServletHolder.prepare( >> ServletHolder.java:770) >> >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( >> ServletHandler.java:538) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:143) >> >> at org.eclipse.jetty.security.SecurityHandler.handle( >> SecurityHandler.java:548) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:190) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doHandle(SessionHandler.java:1593) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:188) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doHandle(ContextHandler.java:1239) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:168) >> >> at org.eclipse.jetty.servlet.ServletHandler.doScope( >> ServletHandler.java:481) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doScope(SessionHandler.java:1562) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:166) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doScope(ContextHandler.java:1141) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:141) >> >> at org.eclipse.jetty.server.handler.HandlerCollection. >> handle(HandlerCollection.java:118) >> >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. >> handle(GzipHandler.java:561) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> >> at org.eclipse.jetty.server.HttpChannel.handle( >> HttpChannel.java:320) >> >> at org.eclipse.jetty.server.HttpConnection.onFillable( >> HttpConnection.java:251) >> >> at org.eclipse.jetty.io.AbstractConnection$ >> ReadCallback.succeeded(AbstractConnection.java:279) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( >> SslConnection.java:258) >> >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( >> SslConnection.java:147) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( >> ChannelEndPoint.java:124) >> >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( >> Invocable.java:122) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. >> produceConsume(ExecuteProduceConsume.java:201) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> >> ... 3 more >> >> Caused by: java.lang.NullPointerException >> >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( >> JspServlet.java:397) >> >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( >> JspServlet.java:387) >> >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( >> ServletHolder.java:637) >> >> ... 36 more >> >> >> >> Caused by: >> >> >> >> org.eclipse.jetty.servlet.ServletHolder$1: >> java.lang.NullPointerException >> >> at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable( >> ServletHolder.java:596) >> >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( >> ServletHolder.java:655) >> >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( >> ServletHolder.java:498) >> >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( >> ServletHolder.java:785) >> >> at org.eclipse.jetty.servlet.ServletHolder.prepare( >> ServletHolder.java:770) >> >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( >> ServletHandler.java:538) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:143) >> >> at org.eclipse.jetty.security.SecurityHandler.handle( >> SecurityHandler.java:548) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:190) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doHandle(SessionHandler.java:1593) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:188) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doHandle(ContextHandler.java:1239) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:168) >> >> at org.eclipse.jetty.servlet.ServletHandler.doScope( >> ServletHandler.java:481) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doScope(SessionHandler.java:1562) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:166) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doScope(ContextHandler.java:1141) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:141) >> >> at org.eclipse.jetty.server.handler.HandlerCollection. >> handle(HandlerCollection.java:118) >> >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. >> handle(GzipHandler.java:561) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> >> at org.eclipse.jetty.server.HttpChannel.handle( >> HttpChannel.java:320) >> >> at org.eclipse.jetty.server.HttpConnection.onFillable( >> HttpConnection.java:251) >> >> at org.eclipse.jetty.io.AbstractConnection$ >> ReadCallback.succeeded(AbstractConnection.java:279) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( >> SslConnection.java:258) >> >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( >> SslConnection.java:147) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( >> ChannelEndPoint.java:124) >> >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( >> Invocable.java:122) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. >> produceConsume(ExecuteProduceConsume.java:201) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( >> QueuedThreadPool.java:672) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( >> QueuedThreadPool.java:590) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.NullPointerException >> >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( >> JspServlet.java:397) >> >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( >> JspServlet.java:387) >> >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( >> ServletHolder.java:637) >> >> ... 36 more >> >> >> >> Caused by: >> >> >> >> java.lang.NullPointerException >> >> at org.apache.jasper.servlet.JspServlet.handleMissingResource( >> JspServlet.java:397) >> >> at org.apache.jasper.servlet.JspServlet.serviceJspFile( >> JspServlet.java:387) >> >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( >> ServletHolder.java:637) >> >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( >> ServletHolder.java:498) >> >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( >> ServletHolder.java:785) >> >> at org.eclipse.jetty.servlet.ServletHolder.prepare( >> ServletHolder.java:770) >> >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( >> ServletHandler.java:538) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:143) >> >> at org.eclipse.jetty.security.SecurityHandler.handle( >> SecurityHandler.java:548) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:190) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doHandle(SessionHandler.java:1593) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextHandle(ScopedHandler.java:188) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doHandle(ContextHandler.java:1239) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:168) >> >> at org.eclipse.jetty.servlet.ServletHandler.doScope( >> ServletHandler.java:481) >> >> at org.eclipse.jetty.server.session.SessionHandler. >> doScope(SessionHandler.java:1562) >> >> at org.eclipse.jetty.server.handler.ScopedHandler. >> nextScope(ScopedHandler.java:166) >> >> at org.eclipse.jetty.server.handler.ContextHandler. >> doScope(ContextHandler.java:1141) >> >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >> ScopedHandler.java:141) >> >> at org.eclipse.jetty.server.handler.HandlerCollection. >> handle(HandlerCollection.java:118) >> >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. >> handle(GzipHandler.java:561) >> >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >> HandlerWrapper.java:132) >> >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> >> at org.eclipse.jetty.server.HttpChannel.handle( >> HttpChannel.java:320) >> >> at org.eclipse.jetty.server.HttpConnection.onFillable( >> HttpConnection.java:251) >> >> at org.eclipse.jetty.io.AbstractConnection$ >> ReadCallback.succeeded(AbstractConnection.java:279) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( >> SslConnection.java:258) >> >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( >> SslConnection.java:147) >> >> at org.eclipse.jetty.io.FillInterest.fillable( >> FillInterest.java:110) >> >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( >> ChannelEndPoint.java:124) >> >> at org.eclipse.jetty.util.thread.Invocable.invokePreferred( >> Invocable.java:122) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> >> at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume. >> produceConsume(ExecuteProduceConsume.java:201) >> >> at org.eclipse.jetty.util.thread.strategy. >> ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( >> QueuedThreadPool.java:672) >> >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( >> QueuedThreadPool.java:590) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> My configuration inside nifi.properties is as below: >> >> # Provenance Repository Properties >> >> nifi.provenance.repository.implementation=org.apache.nifi.provenance. >> >> WriteAheadProvenanceRepository >> >> nifi.provenance.repository.debug.frequency=1_000_000 >> >> nifi.provenance.repository.encryption.key.provider.implementation= >> >> nifi.provenance.repository.encryption.key.provider.location= >> >> nifi.provenance.repository.encryption.key.id= >> >> nifi.provenance.repository.encryption.key= >> >> >> >> # Persistent Provenance Repository Properties >> >> nifi.provenance.repository.directory.default=../provenance_repository >> >> nifi.provenance.repository.max.storage.time=24 hours >> >> nifi.provenance.repository.max.storage.size=1 GB >> >> nifi.provenance.repository.rollover.time=30 secs >> >> nifi.provenance.repository.rollover.size=100 MB >> >> nifi.provenance.repository.query.threads=2 >> >> nifi.provenance.repository.index.threads=1 >> >> nifi.provenance.repository.compress.on.rollover=true >> >> nifi.provenance.repository.always.sync=false >> >> nifi.provenance.repository.index.shard.size=4 GB >> >> >> >> >> >> By the way, does this Data Provenance list all FlowFiles ever created or >> >> only part of it? Should I try to find the FlowFile with the exception >> time >> >> in the log? Thanks. >> >> >> >> Regards, >> >> Ben >> >> >> >> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: >> >> >> >>> Hi Ben, >> >>> >> >>> The ExecuteSqlCommand retry logic does not execute the same query >> >>> multiple times if it succeeds. >> >>> So, there must be input FlowFiles containing the same query had been >> >>> passed more than once. >> >>> It could be the same FlowFile, or different FlowFiles generated by the >> >>> first processor for some reason. >> >>> To investigate those kind of FlowFile level information, NiFi >> >>> provenance data and FlowFile lineage will be very useful. >> >>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html# >> >>> viewing-flowfile-lineage >> >>> >> >>> I didn't mention about it earlier because you were having Provenance >> >>> repository performance issue, but I hope you can use it now with the >> >>> WriteAheadProvenanceRepository. >> >>> >> >>> Thanks, >> >>> Koji >> >>> >> >>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote: >> >>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to >> re-execute >> >>> > the sql query if the connection is lost(connection could be >> unstable), >> >>> my >> >>> > idea is to only transfer the FlowFile to the success relationship >> >>> > after successfully executing the sql query. You could see the do >> while >> >>> loop >> >>> > in the code, the transaction will be rollbacked if the execution >> >>> failed; if >> >>> > the connection is lost, it will retry to execute the sql. >> >>> > Will this logic cause my sql to be executed twice? >> >>> > >> >>> > For the WaitBatch processor, I will take your approach to test >> >>> individually >> >>> > to see if the WaitBatch processor could cause the FlowFile repository >> >>> > checkpointing failure. >> >>> > >> >>> > Regards, >> >>> > Ben >> >>> > >> >>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: >> >>> > >> >>> >> Hi Ben, >> >>> >> >> >>> >> Excuse me, I'm trying, but probably I don't fully understand what >> you >> >>> >> want to achieve with the flow. >> >>> >> >> >>> >> It looks weird that WaitBatch is failing with such FlowFile >> repository >> >>> >> error, while other processor such as ReplaceText succeeds. >> >>> >> I recommend to test WaitBatch alone first without combining the >> >>> >> database related processors, by feeding a test FlowFile having >> >>> >> expected FlowFile attributes. >> >>> >> Such input FlowFiles can be created by GenerateFlowFile processor. >> >>> >> If the same error happens with only WaitBatch processor, then it >> >>> >> should be easier to debug. >> >>> >> >> >>> >> Thanks, >> >>> >> Koji >> >>> >> >> >>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura < >> ijokaruma...@gmail.com >> >>> > >> >>> >> wrote: >> >>> >> > Hi Ben, >> >>> >> > >> >>> >> > The one thing that looks strange in the screenshot is the >> >>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming >> connection. >> >>> >> > Those should be transferred to 'failure' relationship. >> >>> >> > >> >>> >> > Following executeSql() method, shouldn't it re-throw the caught >> >>> >> exception? >> >>> >> > >> >>> >> > >> >>> >> > try (Connection con = dbcpService.getConnection()) { >> >>> >> > logger.debug("设置autoCommit为false"); >> >>> >> > con.setAutoCommit(false); >> >>> >> > >> >>> >> > try (Statement stmt = con.createStatement()) { >> >>> >> > logger.info("执行sql语句: {}", new >> Object[]{sql}); >> >>> >> > stmt.execute(sql); >> >>> >> > >> >>> >> > // 所有sql语句执行在一个transaction内 >> >>> >> > logger.debug("提交transaction"); >> >>> >> > con.commit(); >> >>> >> > } catch (Exception ex) { >> >>> >> > logger.error("执行sql语句失败:{}", new Object[]{sql, >> >>> ex}); >> >>> >> > con.rollback(); >> >>> >> > //将exception抛到外层处理 >> >>> >> > throw ex; >> >>> >> > } finally { >> >>> >> > logger.debug("重新设置autoCommit为true"); >> >>> >> > con.setAutoCommit(true); >> >>> >> > } >> >>> >> > } catch (Exception ex) { >> >>> >> > // HERE, the exception is swallowed, that's why the FlowFiles >> stay in >> >>> >> > the incoming connection. >> >>> >> > logger.error("重试执行sql语句:{}", new Object[]{sql, >> ex}); >> >>> >> > retryOnFail = true; >> >>> >> > } >> >>> >> > >> >>> >> > Thanks, >> >>> >> > Koji >> >>> >> > >> >>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: >> >>> >> >> Hi Koji, no problem. You could check the code of processor >> >>> WaitBatch at >> >>> >> the >> >>> >> >> link: >> >>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQ >> fWQ >> >>> >> >> >> >>> >> >> I also uploaded a snapshot of part of NiFi flow which includes >> the >> >>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at >> the >> >>> >> link: >> >>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3 >> >>> IVi2h/view >> >>> >> >> >> >>> >> >> You mentioned above that FlowFile repository fails checkpointing >> >>> will >> >>> >> >> affect other processors to process same FlowFile again, but as >> you >> >>> could >> >>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second >> >>> >> processor >> >>> >> >> and before the WaitBatch processor, even if the FlowFile >> repository >> >>> >> >> checkpointing failure is caused by WaitBatch, could it lead to >> the >> >>> >> >> processors before it to process a FlowFile multiple times? >> Thanks. >> >>> >> >> >> >>> >> >> Regards, >> >>> >> >> Ben >> >>> >> >> >> >>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com >> >: >> >>> >> >> >> >>> >> >>> Hi Ben, >> >>> >> >>> >> >>> >> >>> I was referring these two log messages in your previous email. >> >>> >> >>> These two messages are both written by ExecuteSqlCommand, it >> does >> >>> not >> >>> >> >>> mean 'it was executed again'. >> >>> >> >>> >> >>> >> >>> ``` >> >>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >> >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand >> >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >>> 执行sql语句: >> >>> >> SELECT >> >>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 >> FROM >> >>> >> >>> dbo.ods_extractDataDebug; >> >>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 >> drop >> >>> >> column >> >>> >> >>> _id; >> >>> >> >>> >> >>> >> >>> and it was executed again later: >> >>> >> >>> >> >>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >> >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand >> >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >>> >> >>> 执行sql语句失败:SELECT >> >>> >> >>> ``` >> >>> >> >>> >> >>> >> >>> As you written, the case where FlowFile repository fails >> >>> checkpointing >> >>> >> >>> will affect other processors to process same FlowFiles again. >> >>> However >> >>> >> >>> there won't be a simple solution to every processor to rollback >> its >> >>> >> >>> job as different processors do different things. Creating a temp >> >>> table >> >>> >> >>> if not exist seems right approach to me. >> >>> >> >>> >> >>> >> >>> At the same time, the route cause of getting FlowFile repository >> >>> >> >>> failed should be investigated. Is it possible to share WaitBatch >> >>> code? >> >>> >> >>> The reason why ask this is all 'FlowFile Repository failed to >> >>> update' >> >>> >> >>> is related to WaitBatch processor in the log that you shared >> >>> earlier. >> >>> >> >>> >> >>> >> >>> Thanks, >> >>> >> >>> Koji >> >>> >> >>> >> >>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> >> wrote: >> >>> >> >>> > Hi Koji, I will print the sql before actually executing it, >> but I >> >>> >> checked >> >>> >> >>> > the error log line you mentioned in your reply, this error was >> >>> >> thrown by >> >>> >> >>> > NiFi from within another processor called WaitBatch. >> >>> >> >>> > I didn't find similar errors as the one from the >> >>> ExecuteSqlCommand >> >>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is >> >>> used to >> >>> >> >>> > create temp database tables. >> >>> >> >>> > You could check my ExecuteSqlCommand code via the link: >> >>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh >> >>> jSk_5P >> >>> >> >>> > >> >>> >> >>> > If the error is really caused by FlowFile repository >> checkpoint >> >>> >> failure >> >>> >> >>> and >> >>> >> >>> > the flowfile was executed twice, I may have to create the temp >> >>> table >> >>> >> only >> >>> >> >>> > if doesn't exist, I didn't fix this bug in this way >> >>> >> >>> > right away is because I was afraid this fix could cover some >> >>> other >> >>> >> >>> problems. >> >>> >> >>> > >> >>> >> >>> > Thanks. >> >>> >> >>> > >> >>> >> >>> > Regards, >> >>> >> >>> > Ben >> >>> >> >>> > >> >>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura < >> ijokaruma...@gmail.com >> >>> >: >> >>> >> >>> > >> >>> >> >>> >> Hi Ben, >> >>> >> >>> >> >> >>> >> >>> >> The following two log messages are very close in terms of >> >>> written >> >>> >> >>> >> timestamp, but have different log level. >> >>> >> >>> >> 2017-12-26 07:00:01,312 INFO >> >>> >> >>> >> 2017-12-26 07:00:01,315 ERROR >> >>> >> >>> >> >> >>> >> >>> >> I guess those are logged within a single onTrigger of your >> >>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, >> the >> >>> >> other >> >>> >> >>> >> is when it caught an exception. Just guessing as I don't have >> >>> access >> >>> >> >>> >> to the code. >> >>> >> >>> >> >> >>> >> >>> >> Does the same issue happen with other processors bundled with >> >>> Apache >> >>> >> >>> >> NiFi without your custom processor running? >> >>> >> >>> >> >> >>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then >> the >> >>> >> same >> >>> >> >>> >> FlowFile can be processed again after restarting NiFi. >> >>> >> >>> >> >> >>> >> >>> >> Thanks, >> >>> >> >>> >> Koji >> >>> >> >>> >> >> >>> >> >>> >> >> >>> >> >>> >> >> >>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> >> >>> wrote: >> >>> >> >>> >> > Thanks Koji, I will look into this article about the record >> >>> model. >> >>> >> >>> >> > >> >>> >> >>> >> > By the way, that error I previously mentioned to you >> occurred >> >>> >> again, I >> >>> >> >>> >> > could see the sql query was executed twice in the log, this >> >>> time >> >>> >> I had >> >>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as >> below: >> >>> >> >>> >> > >> >>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process >> Thread-1] >> >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >> >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >>> >> 执行sql语句: >> >>> >> >>> >> SELECT >> >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_ >> 20171226031801926_9195 >> >>> FROM >> >>> >> >>> >> > dbo.ods_extractDataDebug; >> >>> >> >>> >> > alter table tmp.ods_extractDataDebug_ >> 20171226031801926_9195 >> >>> drop >> >>> >> >>> column >> >>> >> >>> >> _id; >> >>> >> >>> >> > >> >>> >> >>> >> > and it was executed again later: >> >>> >> >>> >> > >> >>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process >> Thread-1] >> >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >> >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >> >>> >> >>> >> 执行sql语句失败:SELECT >> >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_ >> 20171226031801926_9195 >> >>> FROM >> >>> >> >>> >> > dbo.ods_extractDataDebug; >> >>> >> >>> >> > alter table tmp.ods_extractDataDebug_ >> 20171226031801926_9195 >> >>> drop >> >>> >> >>> column >> >>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: >> >>> 数据库中已存在名为 >> >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 >> >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. >> >>> >> >>> makeFromDatabaseError( >> >>> >> >>> >> SQLServerException.java:217) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >> getNextResul >> >>> t( >> >>> >> >>> >> SQLServerStatement.java:1655) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >> >>> >> doExecuteStatement( >> >>> >> >>> >> SQLServerStatement.java:885) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ >> >>> >> >>> StmtExecCmd.doExecute( >> >>> >> >>> >> SQLServerStatement.java:778) >> >>> >> >>> >> > at com.microsoft.sqlserver.jdbc. >> TDSCommand.execute(IOBuffer. >> >>> >> >>> java:7505) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection. >> executeComm >> >>> and( >> >>> >> >>> >> SQLServerConnection.java:2445) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >> executeComma >> >>> nd( >> >>> >> >>> >> SQLServerStatement.java:191) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >> executeState >> >>> ment( >> >>> >> >>> >> SQLServerStatement.java:166) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( >> >>> >> >>> >> SQLServerStatement.java:751) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >> >>> >> >>> >> DelegatingStatement.java:264) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >> >>> >> >>> >> DelegatingStatement.java:264) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >> >>> >> >>> >> executeSql(ExecuteSqlCommand.java:194) >> >>> >> >>> >> > at >> >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >> >>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( >> >>> >> >>> >> AbstractProcessor.java:27) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode. >> onTrigger( >> >>> >> >>> >> StandardProcessorNode.java:1119) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.nifi.controller.tasks. >> ContinuallyRunProcessorTask. >> >>> >> call( >> >>> >> >>> >> ContinuallyRunProcessorTask.java:147) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.nifi.controller.tasks. >> ContinuallyRunProcessorTask. >> >>> >> call( >> >>> >> >>> >> ContinuallyRunProcessorTask.java:47) >> >>> >> >>> >> > at >> >>> >> >>> >> > org.apache.nifi.controller.scheduling. >> >>> >> TimerDrivenSchedulingAgent$1. >> >>> >> >>> run( >> >>> >> >>> >> TimerDrivenSchedulingAgent.java:128) >> >>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter. >> >>> >> >>> >> call(Executors.java:511) >> >>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset( >> >>> >> FutureTask.java:308) >> >>> >> >>> >> > at >> >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >> >>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. >> >>> >> java:180) >> >>> >> >>> >> > at >> >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >> >>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor. >> java:294) >> >>> >> >>> >> > at >> >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( >> >>> >> >>> >> ThreadPoolExecutor.java:1142) >> >>> >> >>> >> > at >> >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( >> >>> >> >>> >> ThreadPoolExecutor.java:617) >> >>> >> >>> >> > at java.lang.Thread.run(Thread.java:745) >> >>> >> >>> >> > >> >>> >> >>> >> > I also saw a lot of NiFi's exception like >> "ProcessException: >> >>> >> FlowFile >> >>> >> >>> >> > Repository failed to update", not sure if this is the >> reason >> >>> the >> >>> >> >>> FlowFile >> >>> >> >>> >> > got processed twice. Could you help to take a look at my >> log >> >>> >> file? >> >>> >> >>> >> Thanks. >> >>> >> >>> >> > You could get the log file via the link: >> >>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ >> >>> >> >>> N9Xu6zMEi3/view >> >>> >> >>> >> > >> >>> >> >>> >> > Best Regards, >> >>> >> >>> >> > Ben >> >>> >> >>> >> > >> >>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura < >> >>> ijokaruma...@gmail.com >> >>> >> >: >> >>> >> >>> >> > >> >>> >> >>> >> >> Hi Ben, >> >>> >> >>> >> >> >> >>> >> >>> >> >> This blog post written by Mark, would be a good starting >> >>> point >> >>> >> to get >> >>> >> >>> >> >> familiar with NiFi Record model. >> >>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data- >> >>> >> with-nifi >> >>> >> >>> >> >> >> >>> >> >>> >> >> HA for DistributedMapCacheClientService and >> >>> >> >>> DistributedMapCacheServer >> >>> >> >>> >> >> pair is not supported at the moment. If you need >> >>> >> HighAvailability, >> >>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis >> replication >> >>> >> will >> >>> >> >>> >> >> provide that, I haven't tried that myself though. >> >>> >> >>> >> >> https://redis.io/topics/replication >> >>> >> >>> >> >> >> >>> >> >>> >> >> Thanks, >> >>> >> >>> >> >> Koji >> >>> >> >>> >> >> >> >>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com >> > >> >>> >> wrote: >> >>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard >> and >> >>> seen >> >>> >> >>> >> anything >> >>> >> >>> >> >> > about the NiFi record data model when I was reading the >> >>> NiFi >> >>> >> >>> >> >> > documentations,could you tell me where this model is >> >>> >> documented? >> >>> >> >>> >> Thanks. >> >>> >> >>> >> >> > >> >>> >> >>> >> >> > By the way, to my knowledge, when you need to use the >> >>> >> >>> >> >> DistributedMapCacheServer >> >>> >> >>> >> >> > from DistributedMapCacheClientService, you need to >> >>> specify the >> >>> >> >>> host >> >>> >> >>> >> url >> >>> >> >>> >> >> for >> >>> >> >>> >> >> > the server, this means inside a NiFi cluster >> >>> >> >>> >> >> > when I specify the cache server and the node suddenly >> went >> >>> >> down, I >> >>> >> >>> >> >> couldn't >> >>> >> >>> >> >> > possibly use it until the node goes up again right? Is >> >>> there >> >>> >> >>> currently >> >>> >> >>> >> >> such >> >>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks. >> >>> >> >>> >> >> > >> >>> >> >>> >> >> > Regards, >> >>> >> >>> >> >> > Ben >> >>> >> >>> >> >> > >> >>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura < >> >>> >> ijokaruma...@gmail.com>: >> >>> >> >>> >> >> > >> >>> >> >>> >> >> >> Hi Ben, >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is >> >>> used >> >>> >> to >> >>> >> >>> share >> >>> >> >>> >> >> >> state among different processors, and it can be used by >> >>> your >> >>> >> >>> custom >> >>> >> >>> >> >> >> processors, too. >> >>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies >> >>> >> between >> >>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow >> that >> >>> >> requires >> >>> >> >>> >> that >> >>> >> >>> >> >> >> constraint at least for better performance and >> simplicity. >> >>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large >> >>> amount of >> >>> >> >>> data, >> >>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, >> >>> instead of >> >>> >> >>> >> batches >> >>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can >> use >> >>> NiFi >> >>> >> >>> Record >> >>> >> >>> >> >> >> data model to embed multiple records within a FlowFile, >> >>> Record >> >>> >> >>> should >> >>> >> >>> >> >> >> perform better. >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> Hope this helps. >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> Thanks, >> >>> >> >>> >> >> >> Koji >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 < >> batman...@gmail.com >> >>> > >> >>> >> wrote: >> >>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in >> >>> nifi >> >>> >> which >> >>> >> >>> >> could >> >>> >> >>> >> >> >> sync >> >>> >> >>> >> >> >> > status between my custom processors. >> >>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL >> work >> >>> >> using >> >>> >> >>> nifi >> >>> >> >>> >> and >> >>> >> >>> >> >> >> I'm >> >>> >> >>> >> >> >> > extracting the data from DB into batches of >> >>> FlowFiles(each >> >>> >> >>> batch of >> >>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of >> the >> >>> >> batch). >> >>> >> >>> >> >> >> > There're some groups of custom processors downstream >> >>> that >> >>> >> need >> >>> >> >>> to >> >>> >> >>> >> >> process >> >>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And >> we >> >>> >> expect >> >>> >> >>> these >> >>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a >> time. >> >>> >> >>> >> >> >> > Therefore we need to implement a custom Wait >> >>> processor(let's >> >>> >> >>> just >> >>> >> >>> >> >> call it >> >>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of >> >>> FlowFiles >> >>> >> while >> >>> >> >>> >> the >> >>> >> >>> >> >> >> > business processors were handling the batch of >> FlowFiles >> >>> >> whose >> >>> >> >>> >> >> creation >> >>> >> >>> >> >> >> > time is earlier. >> >>> >> >>> >> >> >> > >> >>> >> >>> >> >> >> > In order to implement this, all the WaitBatch >> processors >> >>> >> placed >> >>> >> >>> in >> >>> >> >>> >> the >> >>> >> >>> >> >> >> flow >> >>> >> >>> >> >> >> > need to read/update records in a shared map so that >> each >> >>> >> set of >> >>> >> >>> >> >> >> > business-logic processors process one batch at a >> time. >> >>> >> >>> >> >> >> > The entries are keyed using the batch number of the >> >>> >> FlowFiles >> >>> >> >>> and >> >>> >> >>> >> the >> >>> >> >>> >> >> >> value >> >>> >> >>> >> >> >> > of each entry is a batch release counter number which >> >>> >> counts the >> >>> >> >>> >> >> number >> >>> >> >>> >> >> >> of >> >>> >> >>> >> >> >> > times the batch of FlowFiles has passed through >> >>> >> >>> >> >> >> > a WaitBatch processor. >> >>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to >> >>> >> increment >> >>> >> >>> the >> >>> >> >>> >> >> batch >> >>> >> >>> >> >> >> > number entry's value by 1 and then the released batch >> >>> >> number and >> >>> >> >>> >> >> counter >> >>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch >> with >> >>> >> >>> >> StateManager; >> >>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will >> >>> check if >> >>> >> the >> >>> >> >>> >> >> counter >> >>> >> >>> >> >> >> > value of the previous released batch number in the >> >>> shared >> >>> >> map is >> >>> >> >>> >> >> greater >> >>> >> >>> >> >> >> > than the one saved locally, if the entry for the >> batch >> >>> >> number >> >>> >> >>> >> does't >> >>> >> >>> >> >> >> > exist(already removed) or the value in the shared >> map is >> >>> >> >>> greater, >> >>> >> >>> >> the >> >>> >> >>> >> >> >> next >> >>> >> >>> >> >> >> > batch will be released and the local state and the >> >>> entry on >> >>> >> the >> >>> >> >>> >> shared >> >>> >> >>> >> >> >> map >> >>> >> >>> >> >> >> > will be updated similarly. >> >>> >> >>> >> >> >> > In the end of the flow, a custom processor will get >> the >> >>> >> batch >> >>> >> >>> >> number >> >>> >> >>> >> >> from >> >>> >> >>> >> >> >> > each batch and remove the entry from the shared map . >> >>> >> >>> >> >> >> > >> >>> >> >>> >> >> >> > So this implementation requires a shared map that >> could >> >>> >> >>> read/update >> >>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify >> >>> >> processors >> >>> >> >>> in >> >>> >> >>> >> >> NIFI >> >>> >> >>> >> >> >> and >> >>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService >> >>> and >> >>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm >> >>> wondering >> >>> >> if I >> >>> >> >>> >> could >> >>> >> >>> >> >> use >> >>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my >> >>> >> logic. I >> >>> >> >>> also >> >>> >> >>> >> >> saw >> >>> >> >>> >> >> >> > another implementation called >> >>> RedisDistributedMapCacheClient >> >>> >> >>> >> Service >> >>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis). >> >>> Thanks >> >>> >> in >> >>> >> >>> >> advance >> >>> >> >>> >> >> >> for >> >>> >> >>> >> >> >> > any suggestions. >> >>> >> >>> >> >> >> > >> >>> >> >>> >> >> >> > Regards, >> >>> >> >>> >> >> >> > Ben >> >>> >> >>> >> >> >> >> >>> >> >>> >> >> >> >>> >> >>> >> >> >>> >> >>> >> >>> >> >> >>> >> >> >> >> >> >>