----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/61665/#review184093 -----------------------------------------------------------
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 227 (patched) <https://reviews.apache.org/r/61665/#comment260095> Consider marking 'resetInterval' as final. webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 257 (patched) <https://reviews.apache.org/r/61665/#comment260099> Consider renaming updateDurations() to setWaitDuration() and call from line #240. void setWaitDuration() { long now = System.currentTimeMillis(); long timeSinceLastWait = now - lastWaitAt; if (timeSinceLastWait > resetInterval) { waitDuration = minDuration; // reset } else if (waitDuration != maxDuration) { waitDuration += increment; if (waitDuration > maxDuration) { waitDuration = maxDuration; } } lastWaitAt = now; } webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 264 (patched) <https://reviews.apache.org/r/61665/#comment260094> Shouldn't waitDuration be set to 'maxDuration' here? Else, the next wait will start from minDuration - causing more frequent retries. webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 275 (patched) <https://reviews.apache.org/r/61665/#comment260103> Consider reading minWaitDuration, maxWaitDuration from configuration, right next to where consumerRetryInterval is read currently (line #120). consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); minWaitDuration = applicationProperties.getInt(MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default maxWaitDuration = applicationProperties.getInt(MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Lines 279 (patched) <https://reviews.apache.org/r/61665/#comment260104> 'pauseDuration' seems to be used only in tests; considering removing this member and update tests to use minWaitDuration. webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java Line 246 (original), 306 (patched) <https://reviews.apache.org/r/61665/#comment260105> Why not call 'adaptiveWaiter.pause()' here as well? - Madhan Neethiraj On Aug. 29, 2017, 9:01 p.m., Ashutosh Mestry wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/61665/ > ----------------------------------------------------------- > > (Updated Aug. 29, 2017, 9:01 p.m.) > > > Review request for atlas, Madhan Neethiraj and Nixon Rodrigues. > > > Bugs: ATLAS-2047 > https://issues.apache.org/jira/browse/ATLAS-2047 > > > Repository: atlas > > > Description > ------- > > Please refer to > [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background > and analysis. > > **Background** > > The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method > is called at the beginning of almost every method in this class. The method > checks if the consumer is closed, if it is then it throws > IllegalStateException. > > Scenario may come about in this way: > - Shutdown has been initiated. Close on consumer is called. > - However, the consumer thread is just about to enter another poll cycle. > - Thus acquire sees that consumer is closed and throws the exception (2nd > bullet above). > > Please take a look at this scala code. This is _ShutdownableThread_. The > thread does the job of handling all exceptions. Upon exception, it manages > the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the > _isRunning_ loop. > ```scala > override def run(): Unit = { > info("Starting ") > try{ > while(isRunning.get()){ > doWork() > } > } catch{ > case e: Throwable => > if(isRunning.get()) > error("Error due to ", e) > } > shutdownLatch.countDown() > info("Stopped ") > } > ``` > > **Implementation** > > Special treatment is given to _IllegalStateException_ by implementing pause & > retry logic: > - Modified _LOG_ to _debug_. That way logs are not filled during retry. > - _HookConsumer_ is more resilient. It handles exceptions resulting from > _Kafka_ and entity APIs. > > > Diffs > ----- > > > webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java > ef64c3b > webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java > PRE-CREATION > > webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java > a6f58e8 > > > Diff: https://reviews.apache.org/r/61665/diff/3/ > > > Testing > ------- > > **Unit tests** > Updated unit tests to reproduce the scenarios and verify the fix. > > **Functional tests** > Verified regular notification scenarios. > > > Thanks, > > Ashutosh Mestry > >
