[
https://issues.apache.org/jira/browse/SAMZA-123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13981667#comment-13981667
]
Jakob Homan commented on SAMZA-123:
-----------------------------------
bq. I'd either like to roll the C2P mapping into the Checkpoint object (and
topic), or have a generic topic as Jakob Homan describes. I think having a
generic topic is a big change that needs to be thought through, through (see 6,
below). What else would we put in there, what would it be used for, etc.
Aye, but the CheckpointManager interface is defined thusly:
{noformat}/**
* Used as a standard interface for writing out checkpoints for a specified
partition.
*/
public interface CheckpointManager \{{noformat}
so if we stuff the task-state-partition-mapping into it, we're redefining the
interface pretty dramatically and it's no longer really a CheckpointManager.
It's also going to be a pretty ugly comingling of relatively orthogonal pieces
of the code (checkpoint manager and state). Hence the suggestion of starting
the job-state log with an easily extendable map interface while slowly
transitioning away from the checkpoint log via KAFKA-1000 and AM-provided
offsets.
bq. What exactly is the proposed logic for this? I think we should think
through the impact of adding/removing/moving SSPs on state for every strategy.
Assuming we have the currently defined cohort -> Set\[SSP\] mapping (current)
and the last one loaded from the C2P/Job-State/Checkpoint/Whatever log
(previous),
* Calculate the complements between the previous set of cohorts and the current
cohorts, logging about any new or missing cohorts. INFO seems reasonable to
me, but WARN would work too.
* For those cohorts in the intersection of current and previous, calculate the
complements between the the current and previous Set\[SSP\]. Log about any new
or missing SSPs.
Again, this reporting functionality is a new level of visibility and safety we
do not currently (and would have a circuitous route to) provide with the
current configuration. Users right now can add/delete SSPs from the their task
inputs (either manually or via something like the RegExTopicGenerator) and will
have no warning.
Logging rather than failing is the appropriate response as detailed in the
answer to Sriram's question above (and in the design doc) about how different
jobs respond to such changes. If a job needs heavier-handed enforcement, the
VerifyingSSPGrouper would work great.
bq. We did this for the MessageChooser, and I think it's the right approach
here as well.
I don't think it was the right choice with MessageChooser, or here. First,
you're only a young Incubator project once and it's better to experiment and be
open as flexible while we can before a larger user base forces more
conservative choices. Experiment while young. Second, nobody thus far has said
this feature shouldn't be pluggable, just that it maybe shouldn't be pluggable
yet. By the same token, if it turns out to be fully baked, we can just make
it, uhm, unpluggable.
bq. I like @sriram's naming suggestion:
See above in my reply. I'm not married to the word cohort, but I am pretty
strong about not overloading another term already used. Users are already
confused about how partitioning works in Samza; there's no reason to make it
worse. Cohort, as defined by Google, seems like the most applicable,
not-yet-used term I could find:
{noformat}a group of people banded together or treated as a group.{noformat}
bq. What exactly is the use case for GroupIntoNSets? If we decouple both
checkpointing and state from the number of TaskInstances we have, why can't we
just use GroupBySSP, and use the yarn.container.count to control the "N"?
The use case is pretty much as Sriram describes - jobs that don't need any
joining but have more SSPs than one might want to spawn a separate TI for each
(for reasons of metrics, reporting, etc.). A 1:1 correspondence between
container count and the sets would be reasonable, but it's a little less
flexible. Also, this ties the SSPGrouper into a YARN dependency. Not a big
point though.
bq. Per-Martin Kleppmann, we shouldn't use the word "state" again:
OK, how about CircumstanceLog? PredicamentLog? FootingFile? :)
> Move topic partition grouping to the AM and generalize
> ------------------------------------------------------
>
> Key: SAMZA-123
> URL: https://issues.apache.org/jira/browse/SAMZA-123
> Project: Samza
> Issue Type: Sub-task
> Components: container
> Affects Versions: 0.6.0
> Reporter: Jakob Homan
> Assignee: Jakob Homan
> Attachments: SAMZA-123-design-doc.md, SAMZA-123-design-doc.pdf
>
>
> Currently the AM sends a set of all the topics and partitions to the
> container, which then groups them by partition and assigns each set to a task
> instance. By moving the grouping to the AM, we can assign arbitrary groups to
> task instances, which will allow more partitioning strategies, as discussed
> in SAMZA-71.
--
This message was sent by Atlassian JIRA
(v6.2#6252)