> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala, 
> > line 205
> > <https://reviews.apache.org/r/22215/diff/3/?file=623716#file623716line205>
> >
> >     getOrElse(..., throw new SamzaException())
> 
> Jakob Homan wrote:
>     I don't understand this comment.
> 
> Chris Riccomini wrote:
>     This is an unprotected get that will throw a useless exception if the SSP 
> isn't in the map. I usually do a getOrElse, and throw a SamzaException with a 
> slightly more useful exception message (e.g. "Missing SystemStreamPartition 
> in Map blah blah. Please configure Foo properly.")

done.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala,
> >  line 121
> > <https://reviews.apache.org/r/22215/diff/3/?file=623758#file623758line121>
> >
> >     mapValues?
> 
> Jakob Homan wrote:
>     No, we need the full map, as above.
> 
> Chris Riccomini wrote:
>     I'm not sure I understand. This works:
>     
>     val foo = Map("a" -> 1, "b" -> 2, "c" -> 3)
>     
>     foo: scala.collection.immutable.Map[java.lang.String,Int] = Map(a -> 1, b 
> -> 2, c -> 3)
>     
>     foo.mapValues(Integer.valueOf)
>     
>     res2: scala.collection.immutable.Map[java.lang.String,java.lang.Integer] 
> = Map(a -> 1, b -> 2, c -> 3)
>     
>     Can't you replace taskNameToStateLogPartitionMapping.map(kv => kv._1 -> 
> Integer.valueOf(kv._2)). with 
> taskNameToStateLogPartitionMapping.mapValues(Integer.valueOf)?

I'm concerned because mapValues actually doesn't do that, but applies a view on 
the original map 
(http://blog.bruchez.name/2013/02/mapmap-vs-mapmapvalues.html), which I didn't 
know until I just looked it up.  This *should* be ok with the code right now, 
but it seems a dangerous method in case we change how the map works in the 
future.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala,
> >  line 301
> > <https://reviews.apache.org/r/22215/diff/3/?file=623749#file623749line301>
> >
> >     Given that you're reading through the entire checkpoint topic for 
> > partition 0, we should to set a smaller segment size and enable log 
> > compaction here. The smaller segment size will allow more of the topic to 
> > be compacted, once compaction is enabled. This should drastically speed up 
> > container startup time.
> 
> Jakob Homan wrote:
>     Should that be done here or by the SREs/Ops? Users may wish to keep a 
> larger log for various reasons...
> 
> Chris Riccomini wrote:
>     I think it should be done here, but as a follow-on ticket. I think that 
> we need to automate as much of this as possible (checkpoint log and state log 
> creation). Otherwise, it's just unusable.

There's already a ticket for this (SAMZA-226).  Is it possible to resize the 
partition from the client?


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 249
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line249>
> >
> >     For Kafka, do we have to run a topic partition expansion on the 
> > changelog partition count in order for this to work?
> 
> Jakob Homan wrote:
>     Only if they add new partitions, which is the same behavior as now.
> 
> Chris Riccomini wrote:
>     I see. And that'd be manual, right? Can you open a follow-on ticket to 
> make it automated?

Will open a new ticket.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 141
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line141>
> >
> >     Can't we just do TaskNamesToSystemStreamPartitions(groups) here?
> 
> Jakob Homan wrote:
>     Not sure I understand.  The sspTaskNamesAsJava is defined by the closure 
> that ends in groups, in order to not spew a bunch of local, intermediate 
> variables into the method and to more easily delineate the work.  That's then 
> what's fed to the TNTSSP...
> 
> Chris Riccomini wrote:
>     Yea, I'm just saying that you return groups, and then immediately wrap it 
> in the TNTSSP. I was suggesting just doing the wrapping on the last return 
> line to remove one line of code.

That seems less clear to me, if I understand correctly.  I'm a fan of Scala's 
ability to define bits of code like this on the fly for clearly delineating 
separate bits of code that are significantly complex to be called out but not 
so important as to be full-on functions in the full class.


> On July 7, 2014, 8:44 p.m., Chris Riccomini wrote:
> > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 122
> > <https://reviews.apache.org/r/22215/diff/3/?file=623734#file623734line122>
> >
> >     This method is confusingly named. Shouldn't it be like 
> > assignTaskNamesToContainers?
> >     
> >     Can you move this to TaskNamesToSystemStreamPartitions.apply()? The 
> > pattern I've been moving toward is to have wiring be done as apply() 
> > methods in companion objects. See DefaultChooser as an example. The idea is 
> > that it keeps SamzaContainer.apply from getting any worse, and keeps the 
> > wiring close to the class that the wiring is instantiating.
> 
> Jakob Homan wrote:
>     This isn't used by TNTSSP directly, it's used in command builders.  At 
> this point the TNTSSPs are already created...
> 
> Chris Riccomini wrote:
>     Yea, I am thinking of it as a complex way to instantiate a TNTSSP. The 
> same way that DefaultChooser.apply isn't used by DefaultChooser. It's used to 
> construct the thing.

But this isn't creating a TNTSSP.  It's creating a map of Int -> TNTSSPs.  If 
we extracted that type of map into its own class (similar to the TNTSSP), then 
it would seem to maybe make sense to put it in the apply method.

I've changed the name to assignContainerToSSPTaskNames


- Jakob


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22215/#review47416
-----------------------------------------------------------


On July 9, 2014, 10:09 p.m., Jakob Homan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22215/
> -----------------------------------------------------------
> 
> (Updated July 9, 2014, 10:09 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-123
>     https://issues.apache.org/jira/browse/SAMZA-123
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Move topic partition grouping to the AM and generalize
> 
> 
> Diffs
> -----
> 
>   .gitignore db9d3ec 
>   samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java 6fad1fa 
>   samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
> a6e1ba6 
>   
> samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java 
> 78d56a9 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
>  PRE-CREATION 
>   
> samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
>  PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/container/TaskName.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java cb40092 
>   samza-api/src/main/java/org/apache/samza/task/TaskContext.java 7c1b085 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
> 5735a39 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 9487b58 
>   
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
>  364e489 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala fcafe83 
>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 
> 4c2d365 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 4ca340c 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 356adbb 
>   
> samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> 99a9841 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  7502124 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskNamesToSystemStreamPartitions.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala 
> f8865b1 
>   samza-core/src/main/scala/org/apache/samza/job/local/LocalJobFactory.scala 
> e20e7c1 
>   
> samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala 
> 3d0a484 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
> 7214151 
>   samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala 
> 4ccd604 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 11c23d0 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
>  bc54f9e 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
> 94f6f4c 
>   
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
>  50d9a05 
>   
> samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> fa10231 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 190bdfe 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 1f5e3bb 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
>  PRE-CREATION 
>   
> samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
>  PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 21d8a78 
>   
> samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala 
> 4f7ddcd 
>   
> samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
>  70d8c80 
>   
> samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala 
> 12f1e03 
>   samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala a67ecdf 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  15245d4 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  cb6dbdf 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  92ac61e 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  6be9732 
>   
> samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
>  72562cf 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> 222c130 
>   
> samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
>  0077af0 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  dc44a99 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b2faebf 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> 01a2683 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  eb1ff54 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  520f784 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  f1139f5 
> 
> Diff: https://reviews.apache.org/r/22215/diff/
> 
> 
> Testing
> -------
> 
> Existing and new unit.  Now moving on to function.
> 
> 
> Thanks,
> 
> Jakob Homan
> 
>

Reply via email to