Aditya created SAMZA-1728:
-----------------------------

             Summary: BootstrappingChooser: Call checkOffset only for a lagging 
partition while choosing.
                 Key: SAMZA-1728
                 URL: https://issues.apache.org/jira/browse/SAMZA-1728
             Project: Samza
          Issue Type: Bug
            Reporter: Aditya
            Assignee: Aditya


We seem to be calling checkOffset even for the ssps that have finished 
bootstrapping, resulting in decrementing systemStreamLagCounts but not 
laggingSystemStreamPartitions as that ssp has already been removed from the 
set. This results in the systemStream removed from systemStreamLagCounts while 
there are still few lagging ssps for that system stream.

if (comparatorResult != null && comparatorResult.intValue() >= 0) {
  laggingSystemStreamPartitions -= systemStreamPartition
  systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) 
- 1)

  if (systemStreamLagCounts(systemStream) == 0) {
    // If the lag count is 0, then no partition for this stream is lagging
    // (the stream has been fully caught up).
    systemStreamLagCounts -= systemStream
  }

 

This results in the following exception:

java.util.NoSuchElementException: key not found: SystemStream 
[system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.MapLike$class.apply(MapLike.scala:141)
 at scala.collection.AbstractMap.apply(Map.scala:58)
 at 
org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
 at 
org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
 at 
org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
 at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
 at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
 at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
 at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
 at 
org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
 at 
org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to