> On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala, > > line 27 > > <https://reviews.apache.org/r/18907/diff/1/?file=513607#file513607line27> > > > > Should add some javadocs for class, all constructor parameters, and all > > public methods.
Done. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala, > > line 43 > > <https://reviews.apache.org/r/18907/diff/1/?file=513607#file513607line43> > > > > What do you think about using blocks instead of method parameters here > > (like we do with TestUtil.expect)? I think it might make the code a little > > cleaner: > > > > run { > > // do some stuff > > } > > > > Not quite sure how to make it work with multiple blocks (to handle > > onException). Probably requires some fiddling around. I'm not sure that's possible (but my Scala is rusty, please correct me if I'm wrong). We need to pass a parameter to the loopOperation (for the loop state), but afaik a block in Scala is syntactic sugar for a parameterless function. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 209 > > <https://reviews.apache.org/r/18907/diff/1/?file=513609#file513609line209> > > > > Specify which partition in the log line. Done. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala, > > line 142 > > <https://reviews.apache.org/r/18907/diff/1/?file=513610#file513610line142> > > > > Comment here explaining why we reset would be useful. Done. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala, > > line 192 > > <https://reviews.apache.org/r/18907/diff/1/?file=513611#file513611line192> > > > > I think we should call assembleMetadata first, then loop.done, then > > return the results. The reason for this is that we might throw an exception > > in assembleMetadata, but loop.done would already have stop the run loop. > > > > Alternatively, maybe we should set loop.notDone or something in the > > exception closure, which would overwrite loop.done here. Good point. I've switched the order. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, > > line 150 > > <https://reviews.apache.org/r/18907/diff/1/?file=513612#file513612line150> > > > > This chunk of code is a little funky. I get that the loop runner will > > forward the interrupted exception, but it's not really immediately obvious > > that we can catch interrupt exceptions but not other kinds of exceptions. > > > > Wonder if we should add a call back for interrupt and closed on > > interrupt exceptions, and default to throwing in cases where callbacks > > aren't passed in? Doesn't seem like a very clean solution either, though. Agree it's funky -- I was just trying to preserve the existing code's behaviour. ;) I don't know why InterruptedException was special-cased in the first place. Should we just let it bubble out, without calling `stop`? Or call `stop` on any kind of exception, not just InterruptedException? Depends on the expected semantics when SystemConsumer.start throws an exception. Looks like an exception thrown here will just cause the container to shut down anyway, so it probably doesn't matter whether or not we call `stop`. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 93 > > <https://reviews.apache.org/r/18907/diff/1/?file=513614#file513614line93> > > > > Do we need a reset if we call done right after? The next time run is > > called, it should reset, right? Agree, I don't know why I put this here. I've removed the reset. > On March 19, 2014, 8:25 p.m., Chris Riccomini wrote: > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala, > > line 313 > > <https://reviews.apache.org/r/18907/diff/1/?file=513615#file513615line313> > > > > Should we verify retryBackoff.sleepCount here? The intention is that the CallLimitReached exception already confirms the fact that we retried the expected number of times (the expected number being specified in the ExponentialSleepStrategy.Mock constructor). If that's unclear, we could add an additional assertion. - Martin ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18907/#review37736 ----------------------------------------------------------- On March 19, 2014, 10:26 p.m., Martin Kleppmann wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18907/ > ----------------------------------------------------------- > > (Updated March 19, 2014, 10:26 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Improvements suggested by Chris in review. > > > SAMZA-174 General-purpose implementation of a retry loop > > > Diffs > ----- > > > samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala > b3c926310920c2c204a805421259d0fa54213170 > > samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala > 3036da93728825d2d37e6791e89aa9801195ad38 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 27b38b25dc6c34f3ef76d400370d1c857834e6a2 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > 10855dcd33133b1b734e500bdf2c1ba85bba13b6 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > 53255490dfbcd926a0d9975415ce437f01c29264 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > afbd7cdc1436a0d9a83857c0f410a2977812ee77 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > b09ade28d4363a4b70a3e4cf942e7974acac596c > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > a4197832075dbb67fb68a81c16ee98d6a2b358a0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > eaa9e53db0b641e16b4eafe2be89ae6d7b4e5384 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > cd0942aae06fa9e358672f0e862711bfb677895d > > Diff: https://reviews.apache.org/r/18907/diff/ > > > Testing > ------- > > > Thanks, > > Martin Kleppmann > >
