[ 
https://issues.apache.org/jira/browse/NIFI-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395784#comment-15395784
 ] 

Mark Payne commented on NIFI-2395:
----------------------------------

If the thread that created the futures didn't get CPU time until after each 
Future got time, that's ok. Each future would call "finishedAdding.get()" which 
would return false. As a result, the 'while' condition is still true because it 
says "... || !finishedAdding.get()" -- so the 'while' condition == true due to 
that negation. So it would enter the while loop and call "queue.poll(10, 
TimeUnit.MILLISECONDS)" - if that returns false, it would go back to evaluate 
the 'while' condition again, and this would continue on until the 'main' thread 
is able to add data to the queue.

Unfortunately, if an Exception is thrown, it won't be in the logs, of the 
'.offer()' call blocks. The task would simply die, and calling "future.get()" 
would give us the Exception. But that's not currently happening if we block in 
queue.offer() - you'd need to update the code to log the Exception in this case 
:(

> PersistentProvenanceRepository Deadlocks caused by a blocked journal merge
> --------------------------------------------------------------------------
>
>                 Key: NIFI-2395
>                 URL: https://issues.apache.org/jira/browse/NIFI-2395
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 0.6.0, 0.7.0
>            Reporter: Brian Davis
>            Assignee: Joseph Witt
>            Priority: Blocker
>             Fix For: 1.0.0, 0.8.0
>
>
> I have a nifi instance that I have been running for about a week and has 
> deadlocked at least 3 times during this time.  When I say deadlock the whole 
> nifi instance stops doing any progress on flowfiles.  I looked at the stack 
> trace and there are a lot of threads stuck doing tasks in the 
> PersistentProvenanceRepository.  Looking at the code I think this is what is 
> happening:
> There is a ReadWriteLock that all the reads are waiting for a write.  The 
> write is in the loop:
> {code}
>                 while (journalFileCount > journalCountThreshold || repoSize > 
> sizeThreshold) {
>                     // if a shutdown happens while we are in this loop, kill 
> the rollover thread and break
>                     if (this.closed.get()) {
>                         if (future != null) {
>                             future.cancel(true);
>                         }
>                         break;
>                     }
>                     if (repoSize > sizeThreshold) {
>                         logger.debug("Provenance Repository has exceeded its 
> size threshold; will trigger purging of oldest events");
>                         purgeOldEvents();
>                         journalFileCount = getJournalCount();
>                         repoSize = getSize(getLogFiles(), 0L);
>                         continue;
>                     } else {
>                         // if we are constrained by the number of journal 
> files rather than the size of the repo,
>                         // then we will just sleep a bit because another 
> thread is already actively merging the journals,
>                         // due to the runnable that we scheduled above
>                         try {
>                             Thread.sleep(100L);
>                         } catch (final InterruptedException ie) {
>                         }
>                     }
>                     logger.debug("Provenance Repository is still behind. 
> Keeping flow slowed down "
>                             + "to accommodate. Currently, there are {} 
> journal files ({} bytes) and "
>                             + "threshold for blocking is {} ({} bytes)", 
> journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
>                     journalFileCount = getJournalCount();
>                     repoSize = getSize(getLogFiles(), 0L);
>                 }
>                 logger.info("Provenance Repository has now caught up with 
> rolling over journal files. Current number of "
>                         + "journal files to be rolled over is {}", 
> journalFileCount);
>             }
> {code}
> My nifi is at the sleep indefinitely.  The reason my nifi cannot move forward 
> is because of the thread doing the merge is stopped.  The thread doing the 
> merge is at:
> {code}
> accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, 
> TimeUnit.MILLISECONDS);
> {code}
> so the queue is full.  
> What I believe happened is that the callables created here:
> {code}
>                             final Callable<Object> callable = new 
> Callable<Object>() {
>                                 @Override
>                                 public Object call() throws IOException {
>                                     while (!eventQueue.isEmpty() || 
> !finishedAdding.get()) {
>                                         final 
> Tuple<StandardProvenanceEventRecord, Integer> tuple;
>                                         try {
>                                             tuple = eventQueue.poll(10, 
> TimeUnit.MILLISECONDS);
>                                         } catch (final InterruptedException 
> ie) {
>                                             continue;
>                                         }
>                                         if (tuple == null) {
>                                             continue;
>                                         }
>                                         indexingAction.index(tuple.getKey(), 
> indexWriter, tuple.getValue());
>                                     }
>                                     return null;
>                                 }
> {code}
> finish before the offer adds its first event because I do not see any Index 
> Provenance Events threads.  My guess is the while loop condition is wrong and 
> should be && instead of ||.
> I upped the thread count for the index creation from 1 to 3 to see if that 
> helps.  I can tell you if that helps later this week.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to