[ 
https://issues.apache.org/jira/browse/SAMZA-348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134685#comment-14134685
 ] 

Sriram Subramanian commented on SAMZA-348:
------------------------------------------

Here are my thoughts so far - 

At the high level, I agree that all communication from the AM to the containers 
should happen through the AM API. All the communication from containers to the 
AM should happen through the stream (at least for now till we can think of 
certain soft states that the containers would like to communicate with the AM 
that need not be persisted). Multiple containers/tasks can write to this stream 
and any race conditions that can occur due to multiple writers is present 
irrespective of the underlying storage medium. 

1. AM catch up speed of config log
We plan to use the config log as the source of truth and use it in the future 
to make dynamic updates to configs. If we want the AM to take actions on the 
cluster immediately after the config updates, it is important the AM is always 
caught up with the config log. It would be useful to do a rough math on the 
potential size that this log can grow assuming a worst case scenario for 
cluster setup. We would turn on key deduplication in Kafka which would ensure 
this log size is bounded but having some estimate would be useful.

2. AM startup time
This is kind of related to the previous point. On startup, the AM needs to read 
the entire config log from head and restore the job configuration and offset 
state.  This could potentially add more time to the startup. Today, the offsets 
are individually restored by the tasks and if they have multiple partitions, 
the restoration can potentially happen faster. This largely again depends on 
how big the stream can grow.

3. AM failure and containers running
I am assuming that in the future we would like to have the containers running 
even when the AM fails. The containers would continue to write their offsets 
into the config log. However, if the container fails when the AM is down, they 
would not be able to start since they cannot get the offsets from the AM. If 
the AM is highly available, we can safely assume that the new AM will be chosen 
within few seconds (AM start + config log restore) and the containers can 
proceed. 

4. Mixing transactional and non transactional updates
It would be worth mentioning that by writing the offsets and the job 
configurations into the same task, we would potentially mix transactions and 
non transactional messages into the same topic. The transactional feature is 
required to ensure exactly once semantics in Samza. The AM would need to use a 
transactionally aware consumer to ensure it reads the data in a consistent 
state.

5. Dynamic config updates
If we let dynamic config updates to happen, we would need some kind of 
boundaries to declare when the AM can take actions on the config changes. For 
example, you may want to change the max memory size and the total number of 
containers and we would like the AM to react to these changes once. One option 
would be to batch changes in the AM for some interval before acting on it. 
Another option would be to introduce the notion of batch config change 
messages. We would simply add a batch config message start header followed by 
all the config changes and then push a batch config message end header when the 
AM knows that it needs to act on the change. 

> Configure Samza jobs through a stream
> -------------------------------------
>
>                 Key: SAMZA-348
>                 URL: https://issues.apache.org/jira/browse/SAMZA-348
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>            Reporter: Chris Riccomini
>              Labels: project
>         Attachments: DESIGN-SAMZA-348-0.md, DESIGN-SAMZA-348-0.pdf
>
>
> Samza's existing config setup is problematic for a number of reasons:
> # It's completely immutable once a job starts. This prevents any dynamic 
> reconfiguration and auto-scaling. It is debatable whether we want these 
> feature or not, but our existing implementation actively prevents it. See 
> SAMZA-334 for discussion.
> # We pass existing configuration through environment variables. YARN exports 
> environment variables in a shell script, which limits the size to the varargs 
> length on the machine. This is usually ~128KB. See SAMZA-333 and SAMZA-337 
> for details.
> # User-defined configuration (the Config object) and programmatic 
> configuration (checkpoints and TaskName:State mappings (see SAMZA-123)) are 
> handled differently. It's debatable whether this makes sense.
> In SAMZA-123, [~jghoman] and I propose implementing a ConfigLog. This log 
> would replace both the checkpoint topic and the existing config environment 
> variables in SamzaContainer and Samza's YARN AM.
> I'd like to keep this ticket's scope limited to just the implementation of 
> the ConfigLog, and not re-designing how Samza's config is used in the code 
> (SAMZA-40). We should, however, discuss how this feature would affect dynamic 
> reconfiguration/auto-scaling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to