Aditya created SAMZA-1728: ----------------------------- Summary: BootstrappingChooser: Call checkOffset only for a lagging partition while choosing. Key: SAMZA-1728 URL: https://issues.apache.org/jira/browse/SAMZA-1728 Project: Samza Issue Type: Bug Reporter: Aditya Assignee: Aditya
We seem to be calling checkOffset even for the ssps that have finished bootstrapping, resulting in decrementing systemStreamLagCounts but not laggingSystemStreamPartitions as that ssp has already been removed from the set. This results in the systemStream removed from systemStreamLagCounts while there are still few lagging ssps for that system stream. if (comparatorResult != null && comparatorResult.intValue() >= 0) { laggingSystemStreamPartitions -= systemStreamPartition systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1) if (systemStreamLagCounts(systemStream) == 0) { // If the lag count is 0, then no partition for this stream is lagging // (the stream has been fully caught up). systemStreamLagCounts -= systemStream } This results in the following exception: java.util.NoSuchElementException: key not found: SystemStream [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS] at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281) at org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204) at org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294) at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210) at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208) at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753) at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101) at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148) -- This message was sent by Atlassian JIRA (v7.6.3#76005)