----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22215/#review45235 -----------------------------------------------------------
Nice work! It's a big patch, but it makes sense. Definitely excited to have this part of Samza. The comments below are mostly stylistic/clarity things. Just one big question I want to specifically call out: The checkpoint topic is changing in an incompatible way, which means that any Samza 0.8.0 jobs won't be able to read Samza 0.7.0 checkpoints and vice versa. What should we do about this? While I think it's ok to break API compatibility between versions (code is fairly easy to change), I'm a bit hesitant about an incompatible data change, as it will create a deployment headache for anyone upgrading their job from Samza 0.7.0 to 0.8.0. They will have to switch to a new checkpoint topic (since a 0.8.0 job will fail to start when trying to read a 0.7.0 checkpoint). We would probably need to provide a checkpoint conversion tool, otherwise everybody would lose their checkpoints. This incompatibility occurs even if the user isn't taking advantage of the new SSP grouping feature. Should we provide a smoother upgrade path here? I would say that, as a minimum, 0.8.0 should be able to read 0.7.0-style checkpoints and automatically convert them into 0.8.0 format. samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java <https://reviews.apache.org/r/22215/#comment79960> Nit: use "o instanceof Checkpoint" rather than "getClass() != getClass()" to allow potential subclassing of Checkpoint? samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java <https://reviews.apache.org/r/22215/#comment79961> On first read, it wasn't clear to me what partitions this referred to. First I thought it was input stream partitions, then I thought it was partitions of the checkpoint stream. Could you update the docs to make more explicit that this is about partitions of changelog streams for stores? samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java <https://reviews.apache.org/r/22215/#comment79962> I don't understand this sentence: "Each taskName has a key that uniquely describes what sets may be in it, but does not generally enumerate the elements of those sets." Could you clarify please? samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java <https://reviews.apache.org/r/22215/#comment79963> What's the difference between a taskName and a taskName key? My understanding was that a taskName is a string that identifies a particular task instance within a job (e.g. with the current Samza behavior, the taskName could be the partition number). samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala <https://reviews.apache.org/r/22215/#comment79964> Is there a JIRA for adapting CheckpointTool to the new checkpoint format? samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala <https://reviews.apache.org/r/22215/#comment79965> Javadoc needs updating to match the code. samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment79967> Perhaps use different namespaces for checkpoints and the partition mapping, to remove the risk of filename collision? e.g. checkpoints go in new File(root, "%s-task-%s" format (jobName, taskName)) and partition mapping goes in new File(root, "%s-partition-mapping" format jobName) samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment79970> Nit: the two equals signs in the same line caught me by surprise. I'd find this more readable: override def setTaskNameToPartitionMapping(mapping: util.Map[String, Integer]) { taskNameMapping = mapping } samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala <https://reviews.apache.org/r/22215/#comment79972> Could you add docs for SSPTaskNameGrouper? Its purpose is not obvious to me. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79974> It looks like Util.decodeTaskNameToPartitionMapping converts from Java types to Scala types, and then here you convert straight back again. Would it be easier to just keep the mapping in Java types throughout? samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala <https://reviews.apache.org/r/22215/#comment79994> Nit: could simplify to: sspTaskNames.getOrElse(taskName, throw ...) samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala <https://reviews.apache.org/r/22215/#comment79995> Nit: unnecessary curly braces? Also, quite a long line -- perhaps break after equals sign? samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80127> Perhaps a radical and/or stupid suggestion, but just to throw it out there... if JSON is causing pain, and we're switching to a different serialization for checkpoints anyway, we could potentially use something like Avro instead of JSON. With Avro, we could specify a schema for the data we want (a checkpoint, in this case), and code-generate Java classes for accessing the data. They ought to be more Scala-friendly (less maps and dynamic typing). Avro also gives us a good system for schema evolution if we want to change aspects of the serialization in future. Not pushing Avro, just throwing it out there as an idea. samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80087> This serde will not be able to read checkpoints written by a previous version of Samza -- is that right? That would set up quite a hurdle for people migrating from 0.7.0 to 0.8.0. I wonder if it would be possible to at least support reading the old checkpoint format, and converting it into the new format (assuming that most people will probably stick with the default GroupByPartition for now). Downgrading back from 0.8.0 to 0.7.0 wouldn't be possible, but at least the upgrade path could be smooth. (We can add the compatibility in a separate JIRA if we want to.) As we're already changing the checkpoint format, I also wonder if we could build some extensibility into it -- eg. by using JSON objects and ignoring any keys that are not recognized. In case we find ourselves needing to add further information to checkpoints in future (for example, in order to provide exactly-once semantics), we could do that by evolving the format. samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80085> Left-over debug statement? samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment79969> Would it make sense to have a separate serde for the partition mapping, rather than tacking it on at the side of CheckpointSerde? samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80089> Left-over debug statement? samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80123> The exception needs to be the second argument to debug() -- see SAMZA-275. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80091> This Util class would probably benefit from some refactoring. I find util classes are always a bit of a "couldn't think of a proper name for this stuff" thing, but this one contains lots of important logic that really deserves classes of its own. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80097> Could import GroupByPartitionFactory to shorten this. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80105> Is this using the YARN terminology for "task", i.e. container? The Samza documentation refers to them as containers, and uses "task" only with the Samza meaning of "TaskInstance" (in SAMZA-7 I've tried to use "task instance" more consistently instead of just "task"). I would advocate that in Samza we consistently refer to the units of parallelism as "containers", not "tasks". In the code directly interfacing with YARN we might not be able to avoid YARN's definition of "task", but in the rest of our code (including here) I think it would be a lot less confusing if we stick to "container". The same goes for log messages (users reading the logs will assume that if we say "task", we mean "task instance"). What do you think? samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80108> Fair enough to fully qualify the Java types, but SystemStreamPartition is already imported. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80116> Kinda similar to CheckpointSerde.partitionMappingToBytes but not quite? samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80093> There's some reasonably subtle logic happening here. Would be good to have thorough unit test coverage. samza-core/src/main/scala/org/apache/samza/util/Util.scala <https://reviews.apache.org/r/22215/#comment80117> This could also happen if checkpointing is disabled. Which reminds me: we should probably disallow jobs with state stores but no checkpoint topic, since that configuration would make a deterministic assignment of taskNames to changelog partitions impossible. Not sure where the best place would be for such a check. samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80118> Variable name could be a little more descriptive ;) samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala <https://reviews.apache.org/r/22215/#comment80120> Could do with more tests here (or is there a follow-up jira for that?). samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala <https://reviews.apache.org/r/22215/#comment80124> Forgotten debug statement? samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80121> Worth pointing out that it only uses partition 0? samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80125> I believe the last argument should be -1 (kafka.api.Request.OrdinaryConsumerId) https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/api/RequestOrResponse.scala;h=57f87a48c5e87220e7f377b23d2bbfa0d16350dc;hb=HEAD#l24 but the Kafka team can probably tell us for sure. samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80182> What is the difference between getEarliestOffset(c, tp) and getOffset(c, tp, OffsetRequest.EarliestTime)? They seem to do the same thing. samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80183> Do you know in which circumstances this could happen? Perhaps OffsetOutOfRange can no longer happen when you're consuming the full checkpoint topic from beginning to end? samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80184> This could potentially be simplified by using SystemStreamPartitionIterator (which is used for consuming the entirety of a changelog partition in order to restore a state store) instead of the low-level Kafka API. samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80185> On container/job startup, when readLastCheckpoint is called several times in quick succession for different task names, readCheckpoint() will be called for each task name, which looks like it could be wasteful. Although readCheckpoint() won't re-consume the entire topic (because it maintains the offset up to which it has read), it looks like it will still make a request to Kafka to get the latest offset every time readCheckpoint() is called. Perhaps that can be avoided, eg. by only connecting to Kafka if some minimum time has elapsed since we last checked the latest offset? samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80186> This is going to break all existing checkpoint topics, since they were most likely created with a larger number of partitions. See previous comment on upgrade compatibility across Samza versions. samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80188> A lot of duplicated code between this and readCheckpoint() above. Would be good to factor out the common parts. samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala <https://reviews.apache.org/r/22215/#comment80122> The exception needs to be the second argument to debug() -- see SAMZA-275. samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala <https://reviews.apache.org/r/22215/#comment80189> The other properties use hyphens instead of spaces; although a space is valid, a hyphen might be better for consistency. - Martin Kleppmann On June 9, 2014, 5:33 p.m., Jakob Homan wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22215/ > ----------------------------------------------------------- > > (Updated June 9, 2014, 5:33 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-123 > https://issues.apache.org/jira/browse/SAMZA-123 > > > Repository: samza > > > Description > ------- > > Move topic partition grouping to the AM and generalize > > > Diffs > ----- > > build.gradle 1a1db16 > check PRE-CREATION > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java dcf81bf > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 34f50fd > samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java > 5aa7a8f > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 5ec6433 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507e > samza-api/src/test/java/org/apache/samza/container/SSPGrouperTestBase.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > 5735a39 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 9487b58 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 364e489 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 4c2d365 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c > > samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > bfff2a6 > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 99a9841 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 7502124 > samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySSP.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSSPTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala > f8865b1 > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala > e20e7c1 > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala > 3d0a484 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 7214151 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > 4ccd604 > samza-core/src/main/scala/org/apache/samza/util/Util.scala 1b548fd > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > bc54f9e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 552f8c2 > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 50d9a05 > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > fa10231 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 190bdfe > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 1f5e3bb > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySSP.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupInNSets.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 > > samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala > 70d8c80 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > 12f1e03 > samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b8c369b > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 0934ebe > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > cb6dbdf > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 92ac61e > > samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala > dae3c2c > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > 222c130 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 0077af0 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > dc44a99 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > c28c9a6 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 01a2683 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > eb1ff54 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 17a96f0 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 0442580 > > Diff: https://reviews.apache.org/r/22215/diff/ > > > Testing > ------- > > Existing and new unit. Now moving on to function. > > > Thanks, > > Jakob Homan > >
