> On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > I'm not wild about SSP* for class names. Thus far the convention has been > > SystemStreamPartition*. I'd rather have it all one way, or all the other. > > If the gripe is that the SystemStreamPartition name is too long, that's > > being discussed in another ticket (rename to Stream/StreamPartition), and I > > believe you had said you had a completely alternative naming idea as well. > > Can we just stick with the existing class name convention, and discuss the > > naming issue separately? > > > > The taskName: String style feels weird to me. First, does it make sense to > > use a class instead of a string? That was the point of using Partition > > initially, instead of just an int. Second, it seems to me that taskId is > > better than taskName now, after seeing it in code. I think either Sriram or > > Jay was pushing this name as well. What do you think about a TaskId class > > instead of a Partition class? > > > > I'm not a fan of the way we handle the taskName/partition mapping in the > > checkpoint manager. Can we just put the mapping in the checkpoint itself, > > rather than having distinct messages for it? > > > > All new classes need docs (e.g. SSPTaskNameGrouper, GroupByPartition, etc). > > It'd also be nice to have some docs on the website as well. Maybe in the > > container section.
Changed SSP to SystemStreamProcessor for all the classes. Changed TaskName to be a first-class object. Changed to statelog partition mapping to a separate entry in the checkpoint log. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > build.gradle, line 61 > > <https://reviews.apache.org/r/22215/diff/1/?file=603052#file603052line61> > > > > Can we just move SSPGrouperTestBase to samza-core instead? Szczepan > > says that this method of pulling in test source is not supported, and > > hacky, so I'd like to limit it as much as possible. > > > > The recommended alternative is apparently to create a samza-test > > submodule that contains all of the test code. Since we already have a > > samza-test submodule that's really integration tests, we'd have to either > > move the samza-test stuff to an integration test submodule, or create a > > second samza-test module (to avoid circular dependencies: samza-core -> > > samza-test -> samza-core). > > > > To me, easiest fix seems to be to just move the one class to samza-core. done. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > check, line 1 > > <https://reviews.apache.org/r/22215/diff/1/?file=603053#file603053line1> > > > > Add license header. > > > > Add brief docs describing what this is for. this was just a test file during development. Removed. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 51 > > <https://reviews.apache.org/r/22215/diff/1/?file=603054#file603054line51> > > > > This all seems a bit hacky. It'd be better if we could have this passed > > in through the constructor, and not have mutable methods. No longer an issue with new checkpoint manager approach. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 60 > > <https://reviews.apache.org/r/22215/diff/1/?file=603054#file603054line60> > > > > Is this code-gen'd? Wan't to make sure we're not manually writing > > equals() since it's error-prone. yes > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java, line 77 > > <https://reviews.apache.org/r/22215/diff/1/?file=603054#file603054line77> > > > > Is this code-gen'd? Wan't to make sure we're not manually writing > > hashCode() since it's error-prone. yes > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, > > line 25 > > <https://reviews.apache.org/r/22215/diff/1/?file=603055#file603055line25> > > > > specified partition -> specified task Changed in previous javadoc patch. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, > > line 55 > > <https://reviews.apache.org/r/22215/diff/1/?file=603055#file603055line55> > > > > Why do we ned this method if we can get the taskName to partition > > mapping from checkpoints? Is this just a convenience method to read the > > mapping across all taskNames? > > > > If so, I'd rather keep this out of the interface, and just provide a > > Util method to handle this. No longer an issue with new checkpoint manager approach. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java, > > line 60 > > <https://reviews.apache.org/r/22215/diff/1/?file=603055#file603055line60> > > > > Why do we ned this method if we can set the taskName to partition > > mapping for each checkpoint? Is this just a convenience method to write the > > mapping once for all taskNames, since it's static? > > > > If so, I'd rather keep this out of the interface, and just provide a > > Util method to handle this. No longer an issue with new checkpoint manager approach. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/container/SSPGrouperFactory.java, > > line 24 > > <https://reviews.apache.org/r/22215/diff/1/?file=603057#file603057line24> > > > > Docs are cute at the expense of readability. Can you just be direct > > here? fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java, > > line 33 > > <https://reviews.apache.org/r/22215/diff/1/?file=603058#file603058line33> > > > > taskNames? > > > > Seems redundant to call it taskNameKeys since taskNames are keys by > > definition, right? fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java, line 34 > > <https://reviews.apache.org/r/22215/diff/1/?file=603059#file603059line34> > > > > lost the "set" verb in method name here. fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java, line 54 > > <https://reviews.apache.org/r/22215/diff/1/?file=603059#file603059line54> > > > > I'm a little confused here. Isn't the mapping a mapping from taskName > > to a set of partitions? Why is the type a string here? This is the mapping of state log partitions, but was badly named. Also, it had been previous converted to JSON before this call. Changed the conversion to happen in the class itself. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala, > > line 119 > > <https://reviews.apache.org/r/22215/diff/1/?file=603062#file603062line119> > > > > I'm assuming this is still to-be-implemented. I think you mentioned > > this in the JIRA. correct. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, > > line 157 > > <https://reviews.apache.org/r/22215/diff/1/?file=603063#file603063line157> > > > > Can we just import scala.collection._ to make this a little more > > succinct? fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 37 > > <https://reviews.apache.org/r/22215/diff/1/?file=603065#file603065line37> > > > > I'm confused. These configs are named the same thing. Is this > > intentional? Seems like maybe SSP_TASK_NAME_GROUPER_FACTORY should be > > deleted. It's unused. fixed. was left over from a previous iteration. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala, > > line 31 > > <https://reviews.apache.org/r/22215/diff/1/?file=603066#file603066line31> > > > > Just so I'm clear, this is only used for the state store mapping? If a > > job doesn't use state stores, this env variable doesn't really get used? yes, improved name. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 76 > > <https://reviews.apache.org/r/22215/diff/1/?file=603069#file603069line76> > > > > Can we break all of this into a separate method in the SamzaContainer > > object just to try and keep apply() no worse than it was before. The method > > is already kind of a mess, so I'm trying to not make it worse. fixed. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 87 > > <https://reviews.apache.org/r/22215/diff/1/?file=603069#file603069line87> > > > > This log line is a little dated. There is no longer a partition > > manager. A more accurate statement would be that the SystemAdmin wasn't > > able to find any partitions for input streams. truncated. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 360 > > <https://reviews.apache.org/r/22215/diff/1/?file=603069#file603069line360> > > > > Do we need the asJavaCollection stuff? We're importing > > JavaConversions._ already fixed. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala, > > line 430 > > <https://reviews.apache.org/r/22215/diff/1/?file=603069#file603069line430> > > > > Do we need the type here? Seems redundant. fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala, > > line 67 > > <https://reviews.apache.org/r/22215/diff/1/?file=603070#file603070line67> > > > > Any way to make this a little cleaner/more succinct? Totally a nit, but > > these long scala.* calls just bug me. If you feel strongly about it, I'm ok > > the way it is, but just voicing my preference. fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala, > > line 24 > > <https://reviews.apache.org/r/22215/diff/1/?file=603072#file603072line24> > > > > Oh dear. I feel like the better thing to do here is just to have a > > custom object that doesn't have to deal with all the Scala junk. I'd like to leave it for now, as it's working. Switching to a straight map would be possible but more verbose. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/TaskNameToSSPs.scala, > > line 67 > > <https://reviews.apache.org/r/22215/diff/1/?file=603072#file603072line67> > > > > This should be plural (TaskNamesToSSPs) since its a map from multiple > > taskNames to multiple SSPs. fixed. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupInNSets.scala, > > line 44 > > <https://reviews.apache.org/r/22215/diff/1/?file=603075#file603075line44> > > > > Can we just use the same pattern we've been using here? Either > > job.num.sets or task.num.sets or something? this class has been removed per the discussion on jira and will be proposed for inclusion in a separate JIRA. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala, > > line 37 > > <https://reviews.apache.org/r/22215/diff/1/?file=603078#file603078line37> > > > > I think you've renamed it because it clashes with the new "taskName" > > nomenclature. I don't think this is a jobName, though, right? Isn't it more > > of a containerName? fixed. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala, > > line 39 > > <https://reviews.apache.org/r/22215/diff/1/?file=603078#file603078line39> > > > > I am having trouble following this code. I don't really have any good > > suggestions, but I think it could maybe use a little more cleanup. Maybe > > I'm in the minority, though. Anyway... I'm confused. :) Added more comments. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala, > > line 58 > > <https://reviews.apache.org/r/22215/diff/1/?file=603079#file603079line58> > > > > Clean up. fixed > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122 > > <https://reviews.apache.org/r/22215/diff/1/?file=603082#file603082line122> > > > > I think I'm confused. We can assign multiple taskNames to the same > > task? I thought we had a 1:1 between TaskInstance -> taskName -> SSPs? Yes, Tasks should probably be renamed containers, but it's confusing since YARN increments container numbers on container failure. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala, > > line 65 > > <https://reviews.apache.org/r/22215/diff/1/?file=603096#file603096line65> > > > > Skipping review of this class. I think I need a walkthrough to > > understand what changes are being made here. Reworked significantly. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala, > > line 127 > > <https://reviews.apache.org/r/22215/diff/1/?file=603103#file603103line127> > > > > Delete or uncomment. Not sure what you're going for here. Was left over from merge of the async client stuff from Yan. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala, > > line 161 > > <https://reviews.apache.org/r/22215/diff/1/?file=603105#file603105line161> > > > > Still confused. I thought task:taskName mapping was 1:1. Probably > > forgetting things. Can you explain this to me again? Talked offline. > On June 5, 2014, 2:23 p.m., Chris Riccomini wrote: > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala, > > line 388 > > <https://reviews.apache.org/r/22215/diff/1/?file=603107#file603107line388> > > > > Not to excited to delete all of these tests without corresponding tests > > for taskNames. There are new tests for all the SSPGroupers that cover this same logic. - Jakob ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22215/#review44817 ----------------------------------------------------------- On July 3, 2014, 12:50 p.m., Jakob Homan wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22215/ > ----------------------------------------------------------- > > (Updated July 3, 2014, 12:50 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-123 > https://issues.apache.org/jira/browse/SAMZA-123 > > > Repository: samza > > > Description > ------- > > Move topic partition grouping to the AM and generalize > > > Diffs > ----- > > .gitignore db9d3ec > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > a6e1ba6 > > samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java > 78d56a9 > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java > PRE-CREATION > > samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/container/TaskName.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 > samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > 5735a39 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 9487b58 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 364e489 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 4c2d365 > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 356adbb > > samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > 99a9841 > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 7502124 > > samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupByPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/groupers/GroupBySystemStreamPartition.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/ssp/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala > f8865b1 > samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala > e20e7c1 > > samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala > 3d0a484 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 7214151 > samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala > 4ccd604 > samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > bc54f9e > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > 94f6f4c > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 50d9a05 > > samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > fa10231 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 190bdfe > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 1f5e3bb > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupByPartition.scala > PRE-CREATION > > samza-core/src/test/scala/org/apache/samza/container/ssp/groupers/TestGroupBySystemStreamPartition.scala > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 > samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala > 4f7ddcd > > samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala > 70d8c80 > > samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala > 12f1e03 > samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 15245d4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > cb6dbdf > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 92ac61e > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 6be9732 > > samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala > dae3c2c > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > 222c130 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala > 0077af0 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > dc44a99 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 01a2683 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > eb1ff54 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 520f784 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > f1139f5 > > Diff: https://reviews.apache.org/r/22215/diff/ > > > Testing > ------- > > Existing and new unit. Now moving on to function. > > > Thanks, > > Jakob Homan > >
