----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/53453/#review155377 -----------------------------------------------------------
Some minor code style/documentation related comments. One correctness question. samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java (line 25) <https://reviews.apache.org/r/53453/#comment225248> No comma after SystemConsumers samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java (line 134) <https://reviews.apache.org/r/53453/#comment225249> s/checkpoint/register the same offset. Also, maintain previous line width for comment. samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 77) <https://reviews.apache.org/r/53453/#comment225250> Strongly prefer not adding empty Map() as default value here (and maybe clean up the other one too). See my comment on Xinyu's HDFS performance RB for explanation. samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 146) <https://reviews.apache.org/r/53453/#comment225252> s/ones/one samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 239) <https://reviews.apache.org/r/53453/#comment225255> Are you missing a foreach here? I think you need something like: lastProcessedOffsets.get(taskName) .foreach { sspToOffsets => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) } } If the version above is correct, can we add a test for this? Its easy to miss. samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 243) <https://reviews.apache.org/r/53453/#comment225257> Can remove comment, same information in next line. samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 244) <https://reviews.apache.org/r/53453/#comment225258> Space after 'case' and after ':' samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala (line 245) <https://reviews.apache.org/r/53453/#comment225259> s/is an empty list/is empty samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 361) <https://reviews.apache.org/r/53453/#comment225260> Indent by 2 Space before and after => Space b/w map and { samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (line 366) <https://reviews.apache.org/r/53453/#comment225261> Indent by 2. - Prateek Maheshwari On Nov. 4, 2016, 4:23 p.m., Boris Shkolnik wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/53453/ > ----------------------------------------------------------- > > (Updated Nov. 4, 2016, 4:23 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-1042 > https://issues.apache.org/jira/browse/SAMZA-1042 > > > Repository: samza > > > Description > ------- > > Add optional interface for SystemConsumer checkpontListener() for checkpoint > notifications. > > > Diffs > ----- > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointListener.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java > 8dfcc7499659442aabd3085a8787475fe38f29db > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > c41eadb70f4675816245f7ac40f0db2fc16335f0 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > e0468ee89c89fd720834461771ebb36475475bcb > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > cb78223f1b59a78bbeb1e42b5974670a53def504 > > Diff: https://reviews.apache.org/r/53453/diff/ > > > Testing > ------- > > gradlew test. > manual testing. > > > Thanks, > > Boris Shkolnik > >