[
https://issues.apache.org/jira/browse/SAMZA-448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-448:
----------------------------------
Attachment: SAMZA-448-0.patch
Attaching patch. RB at:
https://reviews.apache.org/r/27649
Changes made:
# Added a createCoordinatorStream method to the SystemAdmin interface.
# Updated KafkaSystemAdmin to create coordinator streams the same way that
KafkaCheckpointManager creates checkpoint streams.
# Fixed Kafka flush bug discovered in SAMZA-458. Bug was exposed through a unit
test, and required fixing.
# Added job.coordinator.system config, which defines which coordinator system
to use.
# Added job.container.count config, and removed yarn.container.count. I've
maintained backwards compatibility by falling back to yarn.container.count, and
spewing a deprecated warning message.
# Switched SAMZA\_CONFIG environment variable to
SAMZA\_COORDINATOR\_SYSTEM\_CONFIG. The new environment variable contains only
the configs required to instantiate a coordinator stream (systems.<coordinator
system>.*, job.name, job.id).
# Wrote CoordinatorStreamMessage class, with two inner classes: SetConfig and
Delete.
# Wrote CoordinatorStreamSystemConsumer and CoordinatorStreamSystemProducer.
These are helper classes for managing the coordinator stream. They both wrap
SystemConsumer and SystemProducer.
# Updated JobCoordinator to read messages from the coordinator stream, rather
than from the SAMZA\_CONFIG environment variable.
# Updated various YARN classes to work with updated JobCoordinator.
# JobRunner puts all configs into the coordinator stream. It also deletes all
configs that were in the stream, but are no longer set in the latest config.
# Moved rewriteConfig from JobRunner to JobCoordinator.
# Added some mock classes to help with config passing in unit tests.
# A bunch of changes to the unit tests to make everything work.
_NOTE: This patch does not modify the CheckpointManager code. This means we'll
have two streams: coordinator stream and checkpoint stream. I want to separate
these changes to make the patches more incremental._
For (3), I'll close out SAMZA-458 as a dupe when this is committed.
For (4), Samza will try and guess your coordinator system if there's only one
system defined.
For (6), in the design doc, we discussed using a URI. I found it easier to
maintain the current pattern that we have (JSON + environment variable), but to
shrink the config passed to just be the required configs for the coordinator
stream. I experimented with adding an interface to convert URIs to the required
configs, but it felt pretty hacky, so I backed off on it for now. Longer term,
when we have control-job.sh, we'll need to figure this out, but since JobRunner
is the only thing mutating the coordinator stream right now, I stuck with the
current pattern. This should still allow the environment variable to be
extremely small, so the "can't support large config" problem we're trying to
solve is still fixed.
For (7-9), I am not happy with this code. It's hacky and not well thought out.
The reason I'm posting now, rather than refactoring, is that the patch is
already really big. Rather than posting one giant patch, I'm trying to do
things incrementally, so everyone can follow along.
I've also run this code locally with hello-samza, and it works. I also
deleted/added new configs, and verified that the changes were honored by the
JobCoordinator.
TODOs for this ticket:
* Add docs for job.coordinator.system and job.container.count.
* Add logging where required.
* Write tests where required.
Follow-on JIRAs:
* Eliminate all CheckpointManager code, and replace it with messages in the
CoordinatorStream.
* Refactor
JobCoordinator/CoordinatorStreamMessage/CoordinatorStreamSystemConsumer/CoordinatorStreamSystemProducer
code. I am really disappointed with this code right now.
> Pass config from JobRunner to JobCoordinator via ConfigStream
> -------------------------------------------------------------
>
> Key: SAMZA-448
> URL: https://issues.apache.org/jira/browse/SAMZA-448
> Project: Samza
> Issue Type: Sub-task
> Components: container
> Affects Versions: 0.9.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.9.0
>
> Attachments: SAMZA-448-0.patch
>
>
> SAMZA-438 modified the AM/local job factories (job coordinator) to pass
> configuration to SamzaContainer via an HTTP server. This ticket is to modify
> the other half of the pipeline: the config passing from the JobRunner to the
> job coordinator to be done via a ConfigStream, rather than an environment
> variable.
> The goal of this ticket is to make the change as transparent as possible.
> {{control-job.sh}} won't be introduced in this ticket. Instead, we'll
> continue using the ConfigFactories that we have, but pass the resolved config
> to the coordinator using the ConfigStream rather than the environment
> variable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)