Dale LaBossiere created QUARKS-166:
--------------------------------------
Summary: Add Gate plumbing
Key: QUARKS-166
URL: https://issues.apache.org/jira/browse/QUARKS-166
Project: Quarks
Issue Type: New Feature
Reporter: Dale LaBossiere
As part of an initial experimental implementation of QUARKS-156 concurrent
analytics / barrier, I had a need for a stream Gate mechanism - a way to
control the release of tuples into an output stream. It's not used now.
If there's a +1 sentiment for adding this to PlumbingStreams here's the code:
/**
* Control the flow of tuples to an output stream.
* <P>
* A {@link Semaphore} is used to control the flow of tuples
* through the {@code gate}. The gate acquires a permit from the
* semaphore to pass the tuple through, blocking until a permit is
* acquired (and applying backpressure upstream while blocked).
* Elsewhere, some code calls {@link Semaphore#release(int)}
* to make permits available.
* </P><P>
* If a TopologyProvider is used that can distribute a topology's
* streams to different JVM's the gate and the code releasing the
* permits must be in the same JVM.
* </P><P>
* Sample use:
* <BR>
* Suppose you wanted to control processing such that concurrent
* pipelines processed each tuple in lock-step.
* I.e., You want all of the pipelines to start processing a tuple
* at the same time and not start a new tuple until the current
* tuple had been fully processed by each of them:
* <pre>{@code
* TStream<Integer> readings = ...;
*
* Semaphore gateControl = new Semaphore(1); // allow the first to pass
through
* TStream<Integer> gated = gate(readings, gateControl);
*
* // Create the concurrent pipeline combiner and have it
* // signal that concurrent processing of the tuple has completed.
* // In this sample the combiner just returns the received list of
* // each pipeline result.
* Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
* stream -> stream.map(
* list -> {
* gateControl.release();
* return list;
* });
* TStream<List<Integer>> results = PlumbingStreams.concurrent(gated,
pipelines, combiner);
* }</pre>
* </P>
* @param stream the input stream
* @param semaphore gate control
* @return gated stream
*/
public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
return stream.map(tuple -> {
try {
semaphore.acquire();
return tuple;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted", e);
}});
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)