[ 
https://issues.apache.org/jira/browse/SAMZA-603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364196#comment-14364196
 ] 

Chris Riccomini commented on SAMZA-603:
---------------------------------------

For these two:

* Add new input streams after a job has started.
* Stop consuming a stream.

It seems like either a StreamTask *or* a JobCoordinator might want to do this. 
Having some way to poll a system for new SystemStreams (or new 
SystemStreamPartitions for an existing SystemStream) based on a regex or 
something would be useful. A StreamTask might also want to add new input 
streams based on control messages, remote store state, etc.

Adding/removing a stream could be done by having the JobCoordinator write the 
task.inputs config into the coordinator stream with the new/old stream added or 
removed from the list.

Pausing can already be done today by having the MessageChooser decide to stop 
choosing messages for a given SSP. Exposing the MessageChooser via the 
SamzaContainerContext (or TaskContext) would allow StreamTasks to call 
arbitrary methods on the chooser, including "pause".

Overriding offsets can be achieved by writing checkpoint messages to the 
coordinator stream. It could also be achieved by making the SystemConsumer API 
mutable after start() is called, so a StreamTask could tell it to override its 
fetch offsets to some prior point. Another alternative would be to keep the 
SystemConsumer API immutable, and to fully rebuild a *new* SystemConsumer 
whenever an offset is changed for it. This seems a bit safer since it would 
discard any buffered messages that had already been consumed, but might be 
sitting in BlockingEnvelopeMap.

> Expose more consumer controls to developers
> -------------------------------------------
>
>                 Key: SAMZA-603
>                 URL: https://issues.apache.org/jira/browse/SAMZA-603
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>             Fix For: 0.10.0
>
>
> Several overlapping feature requests have emerged. People want the ability to 
> programmatically:
> # Add new input streams after a job has started.
> # Pause consuming a stream for a period of time.
> # Stop consuming a stream.
> # Override offsets at both job start time, and after a job has been running 
> for a while (rewind/fast forward).
> We don't really expose any of these controls to the StreamTask or 
> JobCoordinator in a pluggable way. This is an umbrella ticket to track this 
> work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to