[ 
https://issues.apache.org/jira/browse/SAMZA-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-448:
----------------------------------
    Attachment: SAMZA-448-0.patch

Attaching patch. RB at:

https://reviews.apache.org/r/27649

Changes made:

# Added a createCoordinatorStream method to the SystemAdmin interface.
# Updated KafkaSystemAdmin to create coordinator streams the same way that 
KafkaCheckpointManager creates checkpoint streams.
# Fixed Kafka flush bug discovered in SAMZA-458. Bug was exposed through a unit 
test, and required fixing.
# Added job.coordinator.system config, which defines which coordinator system 
to use.
# Added job.container.count config, and removed yarn.container.count. I've 
maintained backwards compatibility by falling back to yarn.container.count, and 
spewing a deprecated warning message.
# Switched SAMZA\_CONFIG environment variable to 
SAMZA\_COORDINATOR\_SYSTEM\_CONFIG. The new environment variable contains only 
the configs required to instantiate a coordinator stream (systems.<coordinator 
system>.*, job.name, job.id).
# Wrote CoordinatorStreamMessage class, with two inner classes: SetConfig and 
Delete.
# Wrote CoordinatorStreamSystemConsumer and CoordinatorStreamSystemProducer. 
These are helper classes for managing the coordinator stream. They both wrap 
SystemConsumer and SystemProducer.
# Updated JobCoordinator to read messages from the coordinator stream, rather 
than from the SAMZA\_CONFIG environment variable.
# Updated various YARN classes to work with updated JobCoordinator.
# JobRunner puts all configs into the coordinator stream. It also deletes all 
configs that were in the stream, but are no longer set in the latest config.
# Moved rewriteConfig from JobRunner to JobCoordinator.
# Added some mock classes to help with config passing in unit tests.
# A bunch of changes to the unit tests to make everything work.

_NOTE: This patch does not modify the CheckpointManager code. This means we'll 
have two streams: coordinator stream and checkpoint stream. I want to separate 
these changes to make the patches more incremental._

For (3), I'll close out SAMZA-458 as a dupe when this is committed.

For (4), Samza will try and guess your coordinator system if there's only one 
system defined.

For (6), in the design doc, we discussed using a URI. I found it easier to 
maintain the current pattern that we have (JSON + environment variable), but to 
shrink the config passed to just be the required configs for the coordinator 
stream. I experimented with adding an interface to convert URIs to the required 
configs, but it felt pretty hacky, so I backed off on it for now. Longer term, 
when we have control-job.sh, we'll need to figure this out, but since JobRunner 
is the only thing mutating the coordinator stream right now, I stuck with the 
current pattern. This should still allow the environment variable to be 
extremely small, so the "can't support large config" problem we're trying to 
solve is still fixed.

For (7-9), I am not happy with this code. It's hacky and not well thought out. 
The reason I'm posting now, rather than refactoring, is that the patch is 
already really big. Rather than posting one giant patch, I'm trying to do 
things incrementally, so everyone can follow along.

I've also run this code locally with hello-samza, and it works. I also 
deleted/added new configs, and verified that the changes were honored by the 
JobCoordinator.

TODOs for this ticket:

* Add docs for job.coordinator.system and job.container.count.
* Add logging where required.
* Write tests where required.

Follow-on JIRAs:

* Eliminate all CheckpointManager code, and replace it with messages in the 
CoordinatorStream.
* Refactor 
JobCoordinator/CoordinatorStreamMessage/CoordinatorStreamSystemConsumer/CoordinatorStreamSystemProducer
 code. I am really disappointed with this code right now.

> Pass config from JobRunner to JobCoordinator via ConfigStream
> -------------------------------------------------------------
>
>                 Key: SAMZA-448
>                 URL: https://issues.apache.org/jira/browse/SAMZA-448
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-448-0.patch
>
>
> SAMZA-438 modified the AM/local job factories (job coordinator) to pass 
> configuration to SamzaContainer via an HTTP server. This ticket is to modify 
> the other half of the pipeline: the config passing from the JobRunner to the 
> job coordinator to be done via a ConfigStream, rather than an environment 
> variable.
> The goal of this ticket is to make the change as transparent as possible. 
> {{control-job.sh}} won't be introduced in this ticket. Instead, we'll 
> continue using the ConfigFactories that we have, but pass the resolved config 
> to the coordinator using the ConfigStream rather than the environment 
> variable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to