Here are my thoughts -
1. Most of the explanation about retaining the checkpoint and state by task
id applies only for the first two grouping strategies. The group by
partitions and group by SSP maintain the task / partition mapping on adding
more partitions. I agree that changing the grouping strategy should be
undefined behavior and we can try to warn. However, there are cases where
the group strategy could be the same and yet the partitions can be mapped
to different tasks. For example, in the case of GroupIntoNSets, you could
have the same strategy but changing the number of TIs (in order to scale
for changing profile over time) would map the partitions to a different
task instance. Now are the checkpoint and state information for that task
valid? We can try to warn by storing the information of the task id and the
partitions it was mapped to previously but it gets to be confusing. I would
like to understand how would we explain the grouping strategy, state and
checkpoint to work consistently across different grouping strategies. If we
cannot do that, we should not make this feature extensible and rather
provide fixed strategies.
2. My vote for the naming would be taskName if it is a string and taskId if
we can map all use cases to ids. Also the naming for the
getStorePartitionDir need to be something like
def getStoreTaskDir(storeBaseDir: File, storeName: String, taskName:
String) = {
new File(storeBaseDir, storeName + File.separator + taskName)
3. I am confused about how this grouping strategy can be extended by the
user of the framework. Ideally, one should be able to implement the
grouping strategy and provide that as a value in the framework config
"grouping.strategy=org.apache.mypackage.MyGroupingStrategy. However, with
your third strategy, it requires an additional config to be added to the
framework. Is the expectation that anyone who extends the grouping strategy
need to add their own configs to the framework and wire them in?
4. It would also be useful to talk about how does these grouping strategies
map to a SQL like way of expressing a Samza topology. Are there any other
grouping strategies that might require a lot more change in the framework
that just implementing this API?
5. Finally, unless we are very sure about this working out well, we should
not make this a public API. The framework can provide three strategies and
those are the only available strategies for now. Opening it to be
extensible might cause us to break a lot of client logic if we find our
assumptions were wrong.
On Thu, Apr 24, 2014 at 9:39 PM, Jakob Homan (JIRA) <[email protected]> wrote:
>
> [
> https://issues.apache.org/jira/browse/SAMZA-123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13980682#comment-13980682]
>
> Jakob Homan commented on SAMZA-123:
> -----------------------------------
>
> bq. 1. The first is to use the terminology task id rather than cohort (in
> the code, not the write-up). This is a minor issue but task id is more
> intuitive to me.
> Do you just mean a terminology change? My concern is that task id is
> confusing (regrettably) with container ID. Part of the appeal of cohort is
> that is not in any way an already-used term (as compared to partition, set,
> key, group, descriptor, tag, etc.) and so is it is more likely users will
> learn the nuances of what we're trying to do rather than making (likely)
> incorrect assumptions.
> bq. interface SSPGrouper
> bq. { public Map<Integer, Set<SystemStreamPartition> group(Map<Integer,
> Set<SystemStreamPartition> current, Set<SystemStreamPartition> ssps); }
> bq. Is this better or worse? Not sure.
> This is worse as it makes the resulting mapping much more opaque for
> anything other than group-by-partition, particularly some of the more
> flexible options available through this new approach, such as the
> per-data-center grouping [described here|
> https://issues.apache.org/jira/browse/SAMZA-123?focusedCommentId=13864997&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13864997].
> For use cases such as those, a string (or other non-integer) cohort would
> mean that all the metrics, logging, etc. would be keyed by a random number
> rather than the meaningful key we could use now.
> Also, for the group-by-SSP implementation, we can map the state log
> partitions directly to the input-stream partitions naturally, but I'm not
> sure of how much benefit that would be.
>
> After KAFKA-1000 and moving the checkpoint offset retrieval from the
> stream tasks to the AM (and those being passed to the tasks directly),
> there's really no reason for a checkpoint log as such since the SSPGrouper
> determines the actual per-TI SSP set membership. However, the other use
> cases you describe point to the potential need for a per-job state log (in
> contrast to the state logs for the tasks), into which information (keyed by
> cohort) could be maintained. Rather than call it the C2P log (or
> whatever), how about we just call it the job state log, and store a map
> {cohort => map {{ whatever we need, starting with the state log partition
> mapping}? This would allow for easy future expansion and not start a
> precedent of an extra Kafka log for each new piece of information that is
> useful?
>
> > Move topic partition grouping to the AM and generalize
> > ------------------------------------------------------
> >
> > Key: SAMZA-123
> > URL: https://issues.apache.org/jira/browse/SAMZA-123
> > Project: Samza
> > Issue Type: Sub-task
> > Components: container
> > Affects Versions: 0.6.0
> > Reporter: Jakob Homan
> > Assignee: Jakob Homan
> > Attachments: SAMZA-123-design-doc.md, SAMZA-123-design-doc.pdf
> >
> >
> > Currently the AM sends a set of all the topics and partitions to the
> container, which then groups them by partition and assigns each set to a
> task instance. By moving the grouping to the AM, we can assign arbitrary
> groups to task instances, which will allow more partitioning strategies, as
> discussed in SAMZA-71.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.2#6252)
>