> On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > Update website docs SamzaContainer, Streams, and Checkpointing sections.
Yes. Waiting for integration testing so that whatever's written is correct as to what's actually happening. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala, > > line 28 > > <https://reviews.apache.org/r/22215/diff/5/?file=627584#file627584line28> > > > > We can't commit the code like this. Either remove or fix. Last we > > spoke, I think you were saying fixing shouldn't be too bad with new changes. We won't. Will update in a later interation. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 91 > > <https://reviews.apache.org/r/22215/diff/5/?file=627590#file627590line91> > > > > state log -> changelog done. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 93 > > <https://reviews.apache.org/r/22215/diff/5/?file=627590#file627590line93> > > > > Rather than have a def that defines and calls two defs, it seems > > simpler to just have: > > > > SamzaContainer.buildTaskNameToSystemStreamPartition > > SamzaContainer.buildTaskNameToChangeLogPartitionMapping > > > > And have SamzaContainer.main call them. Also, would prefer > > parameterizing these two methods, and have SamzaContainer.main pass the > > environment variables in. Would make them more testable. Fine. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 129 > > <https://reviews.apache.org/r/22215/diff/5/?file=627590#file627590line129> > > > > SSP -> SystemStreamPartition done. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 64 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line64> > > > > This is pretty ugly. You don't do this for the taskName key. Can we > > either rename this variable and copy the same pattern for the taskName key, > > or eliminate this variable and hard code "type" in the map? > > > > e.g. > > > > val key = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE, > > "taskName" -> taskName.getTaskName) > > Added constant. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 176 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line176> > > > > White space. What whitespace? I think it's good to have a break here... > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 180 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line180> > > > > White space. fine. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 210 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line210> > > > > Import scala.collection and just have mutable.Map here done. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 240 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line240> > > > > Remove newlines here. done. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 279 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line279> > > > > I think we call readLastCheckpoint once per taskName. This method > > invocation seems to always result in a readCheckpointsFromLog() call. If we > > have 100s (or 1000s) of taskNames, won't this result in us re-reading the > > entire checkpoint log 100s or 1000s of times, even though the result will > > always be the same (on job start)? It doesn't. We keep startingOffset and maintain our last position. > On July 10, 2014, 10:05 a.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 438 > > <https://reviews.apache.org/r/22215/diff/5/?file=627619#file627619line438> > > > > Remove newlines. done. - Jakob ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22215/#review47575 ----------------------------------------------------------- On July 9, 2014, 10:09 p.m., Jakob Homan wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22215/ > ----------------------------------------------------------- > > (Updated July 9, 2014, 10:09 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-123 > https://issues.apache.org/jira/browse/SAMZA-123 > > > Repository: samza > > > Description > ------- > > Move topic partition grouping to the AM and generalize > > > Diffs > ----- > > .gitignore db9d3ec > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > a6e1ba6 > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java > 78d56a9 > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/container/TaskName.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > 5735a39 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 9487b58 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 364e489 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 4c2d365 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 356adbb > > samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 99a9841 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 7502124 > > samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala > f8865b1 > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala > e20e7c1 > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala > 3d0a484 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 7214151 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > 4ccd604 > samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > bc54f9e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 94f6f4c > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 50d9a05 > > samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > fa10231 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 190bdfe > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 1f5e3bb > > samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 > > samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala > 4f7ddcd > > samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala > 70d8c80 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > 12f1e03 > samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 15245d4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > cb6dbdf > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 92ac61e > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 6be9732 > > samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala > 72562cf > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > 222c130 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 0077af0 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > dc44a99 > samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 01a2683 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > eb1ff54 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 520f784 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > f1139f5 > > Diff: https://reviews.apache.org/r/22215/diff/ > > > Testing > ------- > > Existing and new unit. Now moving on to function. > > > Thanks, > > Jakob Homan > >
