[
https://issues.apache.org/jira/browse/SAMZA-348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167573#comment-14167573
]
Chris Riccomini commented on SAMZA-348:
---------------------------------------
Known remaining issues with the proposed design:
# Message payload format (K:V, vs. K:{K:V, ...})
# How does the control-job.sh script use the SystemConsumer/SystemProducer?
# How will this work in a dev environment?
I will address these in order.
*Message payload format*
The current design models ConfigStream messages as a simple key-value pair. The
downside to this approach is that it breaks atomicity for a StreamTask's
checkpoint (multiple messages are required for a single checkpoint--one per
SSP:offset pair).
The two solutions to this are to (1) depend on transactionality, or (2) support
a message payload format that is nested (K: {K:V, ...}). All offset checkpoints
for a single task could therefore be written in a single message, thus
maintaining atomic commits for all checkpoints within a single task. The latter
approach (nested payloads) is how we currently checkpoint. The downsides to
this approach are:
# The single offset checkpoint message will be much larger than any individual
offset checkpoint message in approach (1).
# Modifying an offset checkpoint requires the job coordinator to do a
read-modify-write, which is more complicated than the simple put that would be
required for approach (1).
# It muddles the data model a little bit.
The problem with (1) is mainly that it depends on transactionality. Without
this, there's the potential for a failure to occur halfway through a task
checkpoint. In such a case, some input streams would fall back, and others
would not. I tend to agree with Martin's assessment of the problem:
bq. My hunch is that it's not a big problem, since state changelogs are
currently also not atomically tied to the checkpoints either, so the semantics
of container restart are somewhat vague already.
But given that it should be fairly trivial to solve this using nested payloads,
we might as well do so. We can always clean it up later, if transactionality
becomes commonplace.
*How does the control-job.sh script use the SystemConsumer/SystemProducer?*
This is a tricky one. Given that Samza has a SystemConsumer/SystemProducer API,
it seems ideal to have the ConfigStream implementation use these interfaces for
reading/writing config. In the design document, I glossed over how the job
coordinator and control-job.sh script know how to translate a URI to a Config
for SystemConsumer/SystemProducer. This is a bit of a chicken and egg problem.
The control-job.sh script needs to know how to write to the ConfigStream, but
in order to do that, it needs config for the SystemFactory.getConsumer() call.
Two potential solutions that I can think of are:
# Introduce a SAMZA\_HOME environment variable, which expects a
conf/samza-site.properties configuration.
# Add a SystemFactory.getConfig(URI uri) interface.
Introducing a SAMZA\_HOME environment variable seems very heavy handed. It's
going to have to be set on every node in the YARN cluster (since the job
coordinator could run on any node), as well as the machine that control-job.sh
is going to run on. This will be hard to operate, may be (Samza) version
dependent, and seems kind of clunky.
Adding a getConfig() API seems mildly hacky. The main problem with this
approach is how to determine which SystemFactory to use based on the URI. We
could do something as simple as Class.forName(uri.getScheme() +
"SystemFactory").newInstance(). This seems a bit hacky and dangerous, but
should work, and maintains pluggability.
Does anyone else have any other ideas for this?
*How will this work in a dev environment?*
It's relatively easy to start a Samza job locally using the ThreadJobFactory or
ProcessJobFactory right now. Config can be inserted via the constructor, and no
Kafka grid is required to do this. In the new design proposal, it seems that
developers will be required to have a Kafka grid (or some equivalent system
implementation--hbase, or whatever) to store their configuration. There doesn't
seem to be much of a way around this, unless the
FileSystemConsumer/FileSystemProducer could be made to work as the backing
system for the ConfigStream (which seems possible at first glance).
> Configure Samza jobs through a stream
> -------------------------------------
>
> Key: SAMZA-348
> URL: https://issues.apache.org/jira/browse/SAMZA-348
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Chris Riccomini
> Labels: project
> Attachments: DESIGN-SAMZA-348-0.md, DESIGN-SAMZA-348-0.pdf,
> DESIGN-SAMZA-348-1.md, DESIGN-SAMZA-348-1.pdf
>
>
> Samza's existing config setup is problematic for a number of reasons:
> # It's completely immutable once a job starts. This prevents any dynamic
> reconfiguration and auto-scaling. It is debatable whether we want these
> feature or not, but our existing implementation actively prevents it. See
> SAMZA-334 for discussion.
> # We pass existing configuration through environment variables. YARN exports
> environment variables in a shell script, which limits the size to the varargs
> length on the machine. This is usually ~128KB. See SAMZA-333 and SAMZA-337
> for details.
> # User-defined configuration (the Config object) and programmatic
> configuration (checkpoints and TaskName:State mappings (see SAMZA-123)) are
> handled differently. It's debatable whether this makes sense.
> In SAMZA-123, [~jghoman] and I propose implementing a ConfigLog. This log
> would replace both the checkpoint topic and the existing config environment
> variables in SamzaContainer and Samza's YARN AM.
> I'd like to keep this ticket's scope limited to just the implementation of
> the ConfigLog, and not re-designing how Samza's config is used in the code
> (SAMZA-40). We should, however, discuss how this feature would affect dynamic
> reconfiguration/auto-scaling.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)