----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/61665/#review183832 -----------------------------------------------------------
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 274 (patched) <https://reviews.apache.org/r/61665/#comment259905> I think it will be useful to abstract this functionality in a class of its own. class AdaptiveWaiter { public AdaptiveWaiter(long minWaitTime, long maxWaitTime, long waitIncrement) { resetInterval = maxWaitTime * 2; } public wait(String message) { long now = System.currentTimeMillis(); long timeSinceLastWait = now - lastWaitAt; if (timeSinceLastWait > resetInterval) { // it has been a long time since the last call waitTime = minWaitTime; } else { waitTime = timeSinceLastWait + waitIncrement; if (waitTime > maxWaitTime) { waitTime = maxWaitTime; } } lastWaitAt = now; LOG.debug(...); sleep(waitTime); } - Madhan Neethiraj On Aug. 17, 2017, 5:04 a.m., Ashutosh Mestry wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/61665/ > ----------------------------------------------------------- > > (Updated Aug. 17, 2017, 5:04 a.m.) > > > Review request for atlas, Madhan Neethiraj and Nixon Rodrigues. > > > Bugs: ATLAS-2047 > https://issues.apache.org/jira/browse/ATLAS-2047 > > > Repository: atlas > > > Description > ------- > > Please refer to > [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background > and analysis. > > **Background** > > The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method > is called at the beginning of almost every method in this class. The method > checks if the consumer is closed, if it is then it throws > IllegalStateException. > > Scenario may come about in this way: > - Shutdown has been initiated. Close on consumer is called. > - However, the consumer thread is just about to enter another poll cycle. > - Thus acquire sees that consumer is closed and throws the exception (2nd > bullet above). > > Please take a look at this scala code. This is _ShutdownableThread_. The > thread does the job of handling all exceptions. Upon exception, it manages > the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the > _isRunning_ loop. > ```scala > override def run(): Unit = { > info("Starting ") > try{ > while(isRunning.get()){ > doWork() > } > } catch{ > case e: Throwable => > if(isRunning.get()) > error("Error due to ", e) > } > shutdownLatch.countDown() > info("Stopped ") > } > ``` > > **Implementation** > > Special treatment is given to _IllegalStateException_ by implementing pause & > retry logic: > - Modified _LOG_ to _debug_. That way logs are not filled during retry. > - _HookConsumer_ is more resilient. It handles exceptions resulting from > _Kafka_ and entity APIs. > > > Diffs > ----- > > > webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java > ef64c3b > > webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java > a6f58e8 > > > Diff: https://reviews.apache.org/r/61665/diff/2/ > > > Testing > ------- > > **Unit tests** > Updated unit tests to reproduce the scenarios and verify the fix. > > **Functional tests** > Verified regular notification scenarios. > > > Thanks, > > Ashutosh Mestry > >