-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review45235
-----------------------------------------------------------


Nice work! It's a big patch, but it makes sense. Definitely excited to have 
this part of Samza.

The comments below are mostly stylistic/clarity things. Just one big question I 
want to specifically call out:

The checkpoint topic is changing in an incompatible way, which means that any 
Samza 0.8.0 jobs won't be able to read Samza 0.7.0 checkpoints and vice versa. 
What should we do about this?

While I think it's ok to break API compatibility between versions (code is 
fairly easy to change), I'm a bit hesitant about an incompatible data change, 
as it will create a deployment headache for anyone upgrading their job from 
Samza 0.7.0 to 0.8.0. They will have to switch to a new checkpoint topic (since 
a 0.8.0 job will fail to start when trying to read a 0.7.0 checkpoint). We 
would probably need to provide a checkpoint conversion tool, otherwise 
everybody would lose their checkpoints. This incompatibility occurs even if the 
user isn't taking advantage of the new SSP grouping feature.

Should we provide a smoother upgrade path here? I would say that, as a minimum, 
0.8.0 should be able to read 0.7.0-style checkpoints and automatically convert 
them into 0.8.0 format.


samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
<https://reviews.apache.org/r/22215/#comment79960>

    Nit: use "o instanceof Checkpoint" rather than "getClass() != getClass()" 
to allow potential subclassing of Checkpoint?



samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
<https://reviews.apache.org/r/22215/#comment79961>

    On first read, it wasn't clear to me what partitions this referred to. 
First I thought it was input stream partitions, then I thought it was 
partitions of the checkpoint stream.
    
    Could you update the docs to make more explicit that this is about 
partitions of changelog streams for stores?



samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java
<https://reviews.apache.org/r/22215/#comment79962>

    I don't understand this sentence: "Each taskName has a key that uniquely 
describes what sets may be in it, but does not generally enumerate the elements 
of those sets." Could you clarify please?



samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
<https://reviews.apache.org/r/22215/#comment79963>

    What's the difference between a taskName and a taskName key? My 
understanding was that a taskName is a string that identifies a particular task 
instance within a job (e.g. with the current Samza behavior, the taskName could 
be the partition number).



samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
<https://reviews.apache.org/r/22215/#comment79964>

    Is there a JIRA for adapting CheckpointTool to the new checkpoint format?



samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
<https://reviews.apache.org/r/22215/#comment79965>

    Javadoc needs updating to match the code.



samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment79967>

    Perhaps use different namespaces for checkpoints and the partition mapping, 
to remove the risk of filename collision? e.g. checkpoints go in
    
    new File(root, "%s-task-%s" format (jobName, taskName))
    
    and partition mapping goes in
    
    new File(root, "%s-partition-mapping" format jobName)



samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment79970>

    Nit: the two equals signs in the same line caught me by surprise. I'd find 
this more readable:
    
    override def setTaskNameToPartitionMapping(mapping: util.Map[String, 
Integer]) {
      taskNameMapping = mapping
    }



samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala
<https://reviews.apache.org/r/22215/#comment79972>

    Could you add docs for SSPTaskNameGrouper? Its purpose is not obvious to me.



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79974>

    It looks like Util.decodeTaskNameToPartitionMapping converts from Java 
types to Scala types, and then here you convert straight back again. Would it 
be easier to just keep the mapping in Java types throughout?



samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
<https://reviews.apache.org/r/22215/#comment79994>

    Nit: could simplify to: sspTaskNames.getOrElse(taskName, throw ...)



samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
<https://reviews.apache.org/r/22215/#comment79995>

    Nit: unnecessary curly braces? Also, quite a long line -- perhaps break 
after equals sign?



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80127>

    Perhaps a radical and/or stupid suggestion, but just to throw it out 
there... if JSON is causing pain, and we're switching to a different 
serialization for checkpoints anyway, we could potentially use something like 
Avro instead of JSON.
    
    With Avro, we could specify a schema for the data we want (a checkpoint, in 
this case), and code-generate Java classes for accessing the data. They ought 
to be more Scala-friendly (less maps and dynamic typing). Avro also gives us a 
good system for schema evolution if we want to change aspects of the 
serialization in future.
    
    Not pushing Avro, just throwing it out there as an idea.



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80087>

    This serde will not be able to read checkpoints written by a previous 
version of Samza -- is that right? That would set up quite a hurdle for people 
migrating from 0.7.0 to 0.8.0.
    
    I wonder if it would be possible to at least support reading the old 
checkpoint format, and converting it into the new format (assuming that most 
people will probably stick with the default GroupByPartition for now). 
Downgrading back from 0.8.0 to 0.7.0 wouldn't be possible, but at least the 
upgrade path could be smooth.
    
    (We can add the compatibility in a separate JIRA if we want to.)
    
    As we're already changing the checkpoint format, I also wonder if we could 
build some extensibility into it -- eg. by using JSON objects and ignoring any 
keys that are not recognized. In case we find ourselves needing to add further 
information to checkpoints in future (for example, in order to provide 
exactly-once semantics), we could do that by evolving the format.



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80085>

    Left-over debug statement?



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment79969>

    Would it make sense to have a separate serde for the partition mapping, 
rather than tacking it on at the side of CheckpointSerde?



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80089>

    Left-over debug statement?



samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80123>

    The exception needs to be the second argument to debug() -- see SAMZA-275.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80091>

    This Util class would probably benefit from some refactoring. I find util 
classes are always a bit of a "couldn't think of a proper name for this stuff" 
thing, but this one contains lots of important logic that really deserves 
classes of its own.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80097>

    Could import GroupByPartitionFactory to shorten this.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80105>

    Is this using the YARN terminology for "task", i.e. container? The Samza 
documentation refers to them as containers, and uses "task" only with the Samza 
meaning of "TaskInstance" (in SAMZA-7 I've tried to use "task instance" more 
consistently instead of just "task").
    
    I would advocate that in Samza we consistently refer to the units of 
parallelism as "containers", not "tasks". In the code directly interfacing with 
YARN we might not be able to avoid YARN's definition of "task", but in the rest 
of our code (including here) I think it would be a lot less confusing if we 
stick to "container".
    
    The same goes for log messages (users reading the logs will assume that if 
we say "task", we mean "task instance").
    
    What do you think?



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80108>

    Fair enough to fully qualify the Java types, but SystemStreamPartition is 
already imported.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80116>

    Kinda similar to CheckpointSerde.partitionMappingToBytes but not quite?



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80093>

    There's some reasonably subtle logic happening here. Would be good to have 
thorough unit test coverage.



samza-core/src/main/scala/org/apache/samza/util/Util.scala
<https://reviews.apache.org/r/22215/#comment80117>

    This could also happen if checkpointing is disabled. Which reminds me: we 
should probably disallow jobs with state stores but no checkpoint topic, since 
that configuration would make a deterministic assignment of taskNames to 
changelog partitions impossible. Not sure where the best place would be for 
such a check.



samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80118>

    Variable name could be a little more descriptive ;)



samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
<https://reviews.apache.org/r/22215/#comment80120>

    Could do with more tests here (or is there a follow-up jira for that?).



samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
<https://reviews.apache.org/r/22215/#comment80124>

    Forgotten debug statement?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80121>

    Worth pointing out that it only uses partition 0?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80125>

    I believe the last argument should be -1 
(kafka.api.Request.OrdinaryConsumerId) 
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/api/RequestOrResponse.scala;h=57f87a48c5e87220e7f377b23d2bbfa0d16350dc;hb=HEAD#l24
 but the Kafka team can probably tell us for sure.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80182>

    What is the difference between getEarliestOffset(c, tp) and getOffset(c, 
tp, OffsetRequest.EarliestTime)? They seem to do the same thing.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80183>

    Do you know in which circumstances this could happen? Perhaps 
OffsetOutOfRange can no longer happen when you're consuming the full checkpoint 
topic from beginning to end?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80184>

    This could potentially be simplified by using SystemStreamPartitionIterator 
(which is used for consuming the entirety of a changelog partition in order to 
restore a state store) instead of the low-level Kafka API.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80185>

    On container/job startup, when readLastCheckpoint is called several times 
in quick succession for different task names, readCheckpoint() will be called 
for each task name, which looks like it could be wasteful. Although 
readCheckpoint() won't re-consume the entire topic (because it maintains the 
offset up to which it has read), it looks like it will still make a request to 
Kafka to get the latest offset every time readCheckpoint() is called.
    
    Perhaps that can be avoided, eg. by only connecting to Kafka if some 
minimum time has elapsed since we last checked the latest offset?



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80186>

    This is going to break all existing checkpoint topics, since they were most 
likely created with a larger number of partitions. See previous comment on 
upgrade compatibility across Samza versions.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80188>

    A lot of duplicated code between this and readCheckpoint() above. Would be 
good to factor out the common parts.



samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
<https://reviews.apache.org/r/22215/#comment80122>

    The exception needs to be the second argument to debug() -- see SAMZA-275.



samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
<https://reviews.apache.org/r/22215/#comment80189>

    The other properties use hyphens instead of spaces; although a space is 
valid, a hyphen might be better for consistency.


- Martin Kleppmann


On June 9, 2014, 5:33 p.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated June 9, 2014, 5:33 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   build.gradle 1a1db16 
>   check PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java dcf81bf 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> 34f50fd 
>   samza-api/src/main/java/org/apache/samza/container/SSPGrouper.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java 
> PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 5aa7a8f 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java 5ec6433 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 611507e 
>   samza-api/src/test/java/org/apache/samza/container/SSPGrouperTestBase.java 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   
> samza-core/src/main/scala/org/apache/samza/container/SSPTaskNameGrouper.scala 
> PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> bfff2a6 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala 
> PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySSP.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSSPTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 3d0a484 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1b548fd 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 552f8c2 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySSP.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupInNSets.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b8c369b 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  0934ebe 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  dae3c2c 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  0077af0 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  dc44a99 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> c28c9a6 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  17a96f0 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  0442580 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to