Very helpful. Thanks for sharing, Chris. On Mon, Sep 29, 2014 at 1:08 PM, Chris Riccomini < [email protected]> wrote:
> Hey Roger, > > Sorry for the late reply. Trying to load balance across multiple > obligations. > > > Just to be more explicit about "starting it from scratch". The only > >way to do this theoretically correctly, I think, would be to have the > >newly partitioned job start with no state and playback it's input topics > >from the beginning of time. > > Correct. Though, it's worth considering what "beginning of time" means. > When you start any Samza job for the first time, you don't generally play > it all data since the beginning of time. It usually just picks up from > time T, and moves forward from there. In this case, I think the time T > that you'd want to pick up from would probably depend on the logic of the > job. It could be "now", "last checkpoint of the old job", "a week ago", > etc. > > > This brings me to another question about deployment. Do you recommend > >having two separate Kafka clusters? In the "public" cluster, brokers > >would be deployed on machines by themselves. Then you have another Kafka > >cluster for Samza in which the brokers are co-located with YARN > >NodeManagers on each machine. With this approach, Samza topologies would > >consume from and ultimately publish to topics on the "public" cluster. > >All of the internal topics like repartitioning, changelog, etc. would be > >hidden away in the Kafka cluster dedicated to Samza. > > As Jakob said, this is how we've been running over the past couple of > years. We use MirrorMaker to pull data back and forth between the two > clusters (be careful for cycles, though). Recently, we moved the NMs off > of the Kafka broker boxes for the Samza grid. The main reason for this was > that the stateful Samza jobs were using page cache as a side-effect of > interacting with the LevelDB state. This caused a degradation in > performance for the Kafka brokers that were running along side the Samza > jobs. Kafka is very page cache sensitive, since it uses it as an in-memory > buffer for the most recent N minutes of messages. By pulling the jobs off > into their own boxes, we were able to solve some performance issues that > we were seeing with the Kafka brokers. > > The locality gains that we were seeing by running both jobs and brokers > together was never measured. I believe it's probably pretty negligible > (and degrades as the cluster size increases). Thoughts on locality are > here: > > https://issues.apache.org/jira/browse/SAMZA-335 > > > Kafka's partitioning model does not lend itself all that well to locality > optimization (vs. a block store like HDFS). > > Anyway, food for thought. > > Cheers, > Chris > > On 9/26/14 11:20 AM, "Roger Hoover" <[email protected]> wrote: > > >Chris, > > > >Thanks for the great answers. It's helping me clear up my thinking... > > > >On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Roger, > >> > >> > If the job's input topics are partitioned by key, then you cannot add > >> >more partitions without corrupting existing state. > >> > >> This is correct. > >> > >> > Does this come up for people in practice? > >> > >> It does come up occasionally for us. Thus far, we usually just run a > >>Kafka > >> topic-partition expansion (thereby trashing the semantics of the > >> partitioning) and restart the job. Inconsistent output is then emitted > >>for > >> a while. We do this only when we agree that inconsistent output is > >> tolerable. > >> > > > >Thanks. This might be a reasonable in many cases (not sure yet). > > > > > >> > >> Another thing we do for this is over-partition our Kafka topics when > >>we're > >> concerned about growth. > >> > >> Both of these solutions are admittedly hacky. As you said, the ideal > >> solution would be some kind of automatic migration. It seems possible > >>that > >> the AM (job coordinator) might be able to manage this, especially of we > >> had a pre-packaged "repartition job" that it could trigger. I haven't > >> thought about this in detail, though. > >> > >> > Deploy jobs to repartition inputs and changelog topics into the new > >> >topics > >> > >> The changelog topic seems problematic to me. It seems that they key used > >> in the changelog might not always be directly related to the > >>partitioning > >> of the input topic. For example, if you have a StreamTask that is > >> consuming a single input partition, and keeping a count in the state > >>store > >> of all messages that it sees, how do you repartition this changelog? In > >> the new world, the keys for the single partition that it's consuming > >>could > >> be spread across many different partitions, and the count is pretty much > >> meaningless, since it can't be split up by key. > >> > >> It almost feels like state has to be totally reset to safely do an input > >> partition expansion under all cases. In a sense, you have to treat the > >>new > >> job as a job that's completely new, and start it from scratch. > >> > > > >Ah, you're right. I think there's no way to migrate state in general. If > >a job is saving any kind of aggregate state then that's an irreversible > >operation that was done on the old partition. There's not enough > >information to "repartition" the results. > > > >Just to be more explicit about "starting it from scratch". The only way > >to do this theoretically correctly, I think, would be to have the newly > >partitioned job start with no state and playback it's input topics from > >the > >beginning of time. > > > > > > > >> > >> > Change job config to point to new topics and restart the job > >> > >> One problem with this is going to be the case where you don't control > >>the > >> producers for the old input topic. They'd either have to be migrated to > >> produce to the new input topic for your job, or you'd have to > >>permanently > >> run the repartition job to move data from the original topic to the > >> currently expanded topic. Keeping the repartition job is not all that > >>wild > >> of an idea. Most Samza topologies we run have some form of a repartition > >> job that runs permanently at the beginning of their flow. > >> > > > >I was thinking about repartitioning as a good design pattern as well. > >Having your job always repartition the input decouples it from the it's > >upstream topic dependencies. This brings me to another question about > >deployment. Do you recommend having two separate Kafka clusters? In the > >"public" cluster, brokers would be deployed on machines by themselves. > >Then you have another Kafka cluster for Samza in which the brokers are > >co-located with YARN NodeManagers on each machine. With this approach, > >Samza topologies would consume from and ultimately publish to topics on > >the > >"public" cluster. All of the internal topics like repartitioning, > >changelog, etc. would be hidden away in the Kafka cluster dedicated to > >Samza. > > > > > >> > >> > All meaningfully-partitioned topics would need to include their keys > >>in > >> >the stream > >> > >> True. Somewhat tangential to this is the case where the key that's been > >> used is not the one your job wishes to partition by. In this case, a > >> repartition job would be required as well. > >> > >> > This would be problematic as the order of the dictionary keys can > >>change > >> >but would still mean the same thing. In order to use JSON as a serde > >>for > >> >keys, you'd need to enforce a sort order on dictionaries. > >> > >> I struggled with this as well. We basically need a forced ordering for > >>the > >> JSON keys in SAMZA-348. Originally, I was thinking of making the > >>key/value > >> messages just a simple string with a delimiter. Something like > >> <type>:<key> for the key and <host>:<source>:<blah> for the value. This > >> approach is also much more compact than JSON. The problem with the > >>latter > >> approach is that it doesn't easily allow for hierarchical key/value > >>pairs. > >> > > > >I've been constructing string keys in my jobs so far as you mentioned but > >it adds extra boilerplate to the code. It would be nice if there were an > >automatic way to do it. > > > > > >> > >> Cheers, > >> Chris > >> > >> On 9/24/14 4:55 PM, "Roger Hoover" <[email protected]> wrote: > >> > >> >Hi all, > >> > > >> >So it seems like one of the first decisions that you have to make when > >> >creating a Samza job is how many partitions to have in your input > >>topics. > >> >This will dictate how many tasks are created and how many changelog > >> >partitions get created. It's great that you can independently change > >>the > >> >number of Samza containers that get deployed but what do you do once > >>you > >> >reach the max (# containers == # tasks)? > >> > > >> >If the job's input topics are partitioned by key, then you cannot add > >>more > >> >partitions without corrupting existing state. Does this come up for > >> >people > >> >in practice? How do you handle it? > >> > > >> >Just trying to think it through, it seems like you need a procedure > >> >something like this: > >> > > >> >1) Create new topics to hold the same data but with more partitions > >> >(inputs, outputs, and changelog topics) > >> >2) Deploy jobs to repartition inputs and changelog topics into the new > >> >topics > >> >3) When caught up, stop the running job > >> >4) Change job config to point to new topics and restart the job (if all > >> >topics are new, this can be done while previous job run is still active > >> >using new job.id) > >> >5) Change downstream jobs to use new output topic if necessary. Doing > >> >this > >> >in a safe way might be hard. > >> > > >> >Ideally at some point, this process could be automated. I was > >>wondering > >> >whether a generic task could be written for step #2 but I think it > >>would > >> >require a couple of constraints: > >> > > >> >1) All meaningfully-partitioned topics would need to include their > >>keys in > >> >the stream. In Kafka, this is optional unless you enable compaction > >>but > >> >for this to work generically, it would have to be mandatory in Samza > >>for > >> >any stream for which partitions have meaning (not using random or > >> >round-robin partitioning). > >> >2) The partition keys should be re-hashable based on their raw byte > >> >representation so that the repartition task would not have to know how > >>to > >> >deserialize the keys in order to compute their new partition. At first > >> >glance, this doesn't seem too onerous but I saw in the Config Stream > >> >proposal (SAMZA-348) that keys might be JSON: > >> > > >> > >>>{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-t > >>>ha > >> >t-is-really-long.1000" > >> >} > >> > > >> >This would be problematic as the order of the dictionary keys can > >>change > >> >but would still mean the same thing. In order to use JSON as a serde > >>for > >> >keys, you'd need to enforce a sort order on dictionaries. > >> > > >> >I'm curious what others do about this or what your thoughts are. > >>Thanks, > >> > > >> >Roger > >> > >> > >
