[
https://issues.apache.org/jira/browse/SAMZA-348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134685#comment-14134685
]
Sriram Subramanian commented on SAMZA-348:
------------------------------------------
Here are my thoughts so far -
At the high level, I agree that all communication from the AM to the containers
should happen through the AM API. All the communication from containers to the
AM should happen through the stream (at least for now till we can think of
certain soft states that the containers would like to communicate with the AM
that need not be persisted). Multiple containers/tasks can write to this stream
and any race conditions that can occur due to multiple writers is present
irrespective of the underlying storage medium.
1. AM catch up speed of config log
We plan to use the config log as the source of truth and use it in the future
to make dynamic updates to configs. If we want the AM to take actions on the
cluster immediately after the config updates, it is important the AM is always
caught up with the config log. It would be useful to do a rough math on the
potential size that this log can grow assuming a worst case scenario for
cluster setup. We would turn on key deduplication in Kafka which would ensure
this log size is bounded but having some estimate would be useful.
2. AM startup time
This is kind of related to the previous point. On startup, the AM needs to read
the entire config log from head and restore the job configuration and offset
state. This could potentially add more time to the startup. Today, the offsets
are individually restored by the tasks and if they have multiple partitions,
the restoration can potentially happen faster. This largely again depends on
how big the stream can grow.
3. AM failure and containers running
I am assuming that in the future we would like to have the containers running
even when the AM fails. The containers would continue to write their offsets
into the config log. However, if the container fails when the AM is down, they
would not be able to start since they cannot get the offsets from the AM. If
the AM is highly available, we can safely assume that the new AM will be chosen
within few seconds (AM start + config log restore) and the containers can
proceed.
4. Mixing transactional and non transactional updates
It would be worth mentioning that by writing the offsets and the job
configurations into the same task, we would potentially mix transactions and
non transactional messages into the same topic. The transactional feature is
required to ensure exactly once semantics in Samza. The AM would need to use a
transactionally aware consumer to ensure it reads the data in a consistent
state.
5. Dynamic config updates
If we let dynamic config updates to happen, we would need some kind of
boundaries to declare when the AM can take actions on the config changes. For
example, you may want to change the max memory size and the total number of
containers and we would like the AM to react to these changes once. One option
would be to batch changes in the AM for some interval before acting on it.
Another option would be to introduce the notion of batch config change
messages. We would simply add a batch config message start header followed by
all the config changes and then push a batch config message end header when the
AM knows that it needs to act on the change.
> Configure Samza jobs through a stream
> -------------------------------------
>
> Key: SAMZA-348
> URL: https://issues.apache.org/jira/browse/SAMZA-348
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Chris Riccomini
> Labels: project
> Attachments: DESIGN-SAMZA-348-0.md, DESIGN-SAMZA-348-0.pdf
>
>
> Samza's existing config setup is problematic for a number of reasons:
> # It's completely immutable once a job starts. This prevents any dynamic
> reconfiguration and auto-scaling. It is debatable whether we want these
> feature or not, but our existing implementation actively prevents it. See
> SAMZA-334 for discussion.
> # We pass existing configuration through environment variables. YARN exports
> environment variables in a shell script, which limits the size to the varargs
> length on the machine. This is usually ~128KB. See SAMZA-333 and SAMZA-337
> for details.
> # User-defined configuration (the Config object) and programmatic
> configuration (checkpoints and TaskName:State mappings (see SAMZA-123)) are
> handled differently. It's debatable whether this makes sense.
> In SAMZA-123, [~jghoman] and I propose implementing a ConfigLog. This log
> would replace both the checkpoint topic and the existing config environment
> variables in SamzaContainer and Samza's YARN AM.
> I'd like to keep this ticket's scope limited to just the implementation of
> the ConfigLog, and not re-designing how Samza's config is used in the code
> (SAMZA-40). We should, however, discuss how this feature would affect dynamic
> reconfiguration/auto-scaling.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)