Actually missed one step. For each bundle:
1) If not owner of the key:
    a) Read in all prior state stuff into memory and cache it.
    b) De-register any prior druid sink that owned that key.
    c) Register it self as the owner of that key
3) Garbage collect anything in state/memory that is now indexed in druid
4) Add the new data part of the bundle to state
5) Output the new data part of the bundle to druid

The idea is that if writing to druid fails, then nothing gets added to
state. But if we write to druid then either we added it to state or the
runner failed in some way and will retry the bundle again so we will write
it to druid again and it will eventually show up in state.

The fixed key space is important since if a machine goes down, you want
some other machine to process a bundle containing that key "soon" it
becomes owner of that space soon otherwise you'll have an extended outage
for queries. You also want a large enough fixed key space since that
controls the parallelism during writing so no single machine becomes the
bottleneck.

The key space doesn't have to be fixed since you can technically figure out
the flow rate of messages and compute an "optimal" key space size
dynamically in the pipeline. Scaling the key space up is really easy, the
complexity comes from scaling the key space down but I wouldn't think about
this until it is needed.


On Thu, Feb 8, 2018 at 5:46 PM, Lukasz Cwik <[email protected]> wrote:

> Based on this description, it seems like druid sinks have to be fault
> tolerant. I was hoping that they didn't need to be and as soon as they
> wrote some information to druid then you would be able to crash and druid
> only used the sinks as an optimization for unindexed data.
>
> In your case it seems like you'll want to have a StatefulDoFn which writes
> information out to to state and druid at the same time. Future calls to the
> StatefulDoFn would be responsible for garbage collecting information from
> state based upon whether that data has been indexed.
> You could map all the data you want to write on to a fixed key space like
> [0, 100). Each druid sink would cache their writes in memory and into state
> (to handle a machine going down or being migrated) and write to druid.
> Whenever it gets a piece of work in the range [0, 100) it would need to:
> 1) If not owner of the key:
>     a) De-register any prior druid sink that owned that key
>     b) Register it self as the owner of that key
> 3) Garbage collect anything in state that is now indexed in druid
> 4) Add the new data to state
> 5) Output the data to druid
>
> Not being the owner of a key should be rare. Also, it would be worthwhile
> to read https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> On Wed, Feb 7, 2018 at 8:44 PM, Charles Allen <[email protected]>
> wrote:
>
>> Thanks for the insightful response.
>>
>> > Can druid handle sinks being created and removed dynamically and at
>> what rate?
>>
>> From the external data access side (the things that need to access Beam
>> runners), announcement is handled by zookeeper, so changes that happen on a
>> scale larger than typical zookeeper timeout level (maybe order tens of
>> seconds) are expected. There are probably ways to do reverse proxies if
>> things go up and down faster. But any launch/terminate speeds that are
>> comparable to network hiccups or unlucky JVM GC timings are going to be
>> problematic regardless.
>>
>> > What are the expectations around this partial data that has been
>> collected by a sink?
>>
>> That is a core area I'm trying to figure out how to handle. Druid
>> indexing right now works by making "small" batches of events (usually high
>> tens of thousands to low hundreds of thousands of events) and doing
>> incremental indexing to get the data into a read-optimized form, then doing
>> a final big merge of a bunch of these incremental indicies. The key thing
>> is this: once data is considered queryable, it needs a chain of custody
>> until loaded up on "historical" nodes (or some other new type of node) in
>> Druid. What this means is that a runner that has query-able state needs to
>> stay up until that state is acknowledged available by something else. If a
>> runner fails and one (or more) other runner starts to re-accumulate that
>> state, it is fine. What is forbidden is for the runner to have queryable
>> state, then just exit under "normal" conditions without waiting for the
>> state to be accessible somewhere else. This can cause backpressure in the
>> runners whenever handoff is delayed.
>>
>> I *THINK* such a handoff should be able to be handled in a reasonable
>> commit workflow. Eventually something needs to merge a bunch of these
>> incremental things together into larger chunks, and such a
>> data-optimization pass is not clear if it would work on the same framework,
>> or have to be a different one.
>>
>> Thoughts?
>> Charles Allen
>>
>>
>> On Wed, Feb 7, 2018 at 9:37 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> There are no existing sinks which are accessible from outside of Apache
>>> Beam.
>>>
>>> Apache Beam does work by processing "bundles" (a unit of work). Each of
>>> these are executed and results are committed back into a Runner (such as
>>> Flink/Spark/...). The lifetime of an individual instance of a sink is bound
>>> to the bundle being processed and can be as small as a few milliseconds.
>>> There are some sinks which cache stuff within the JVM (like connections)
>>> but the caching is best effort and if the machine was to go down (crash,
>>> autoscaling reduces number of workers, ...) and if it ever comes back the
>>> cached state was either unimportant or easily recoverable.
>>>
>>> Can druid handle sinks being created and removed dynamically and at what
>>> rate?
>>> What are the expectations around this partial data that has been
>>> collected by a sink?
>>>
>>>
>>> On Wed, Feb 7, 2018 at 7:31 AM, Charles Allen <[email protected]>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I work closely with druid.io
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=Mnf63IL1qJt71TQ22vHbCBAMjSHgIPQg71o7bUkSRsw&e=>
>>>> and one of the main pain points for any Druid deployment is handling the
>>>> real-time streaming component. What I, personally, would *like* to have is
>>>> the streaming orchestration and streaming state handled by a runner which
>>>> specializes in such things, and allow Druid to focus on the lightning fast
>>>> ad-hoc query side.
>>>>
>>>> A natural contender for such a setup would be a Beam based solution
>>>> with a Druid segment Sink. The trouble we are having pursuing such a setup
>>>> is two fold:
>>>>
>>>>    1.  Many Druid setups run lambda-style pipes to backfill late or
>>>>    wrong data, so the sink needs to call out to the Druid cluster for data
>>>>    version locking and orchestration. Access must be available from the 
>>>> sink
>>>>    to the Druid Overlord and/or Coordinator, or potentially some other
>>>>    task-specific jvm in the cluster.
>>>>    2. Druid segments are queryable while they are being built. This
>>>>    means that the Druid cluster (Broker specifically) must be able to
>>>>    discover and issue *RPC queries* against the Sink on the partial
>>>>    data it has accumulated. This puts an extra dynamic load on the Sink 
>>>> jvm,
>>>>    but in my own experience that extra load is small compared to the load 
>>>> of
>>>>    indexing the incoming data.
>>>>
>>>> The key desire to use a stream-native framework is that the Druid
>>>> MiddleManager/Peon setup (the streaming task infrastructure for Druid) has
>>>> problems recovering from failure, recovering from upgrade easily, handling
>>>> late data well, and dynamic horizontal scaling. The Kafka Indexing
>>>> Extension
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_docs_latest_development_extensions-2Dcore_kafka-2Dingestion.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=AlpnfkhLS3u0I-XIe6pzvtds6_n1MkIMfVl0WrI8v6M&e=>
>>>> handles some of these things, but still doesn't quite operate like a
>>>> stream-native solution, and is only available for Kafka.
>>>>
>>>> What is not clear to me is if such a feature set would be in the
>>>> intended scope of Beam as opposed to having some other streaming data
>>>> catcher service that Beam simply feeds into (aka, a Tranquility
>>>> <https://github.com/druid-io/tranquility> sink). Having a
>>>> customizable, accessible and stateful sink with good commit semantics seems
>>>> like it should be something Beam could support natively, but the
>>>> "accessible" part is where I need some guidance on if that is in scope for
>>>> Beam.
>>>>
>>>> Can the dev list provide some insight to if there are any other Sinks
>>>> that strive to be accessible at run-time from non-Beam components?
>>>>
>>>> Also, is such a use case something that is desired to be a part of what
>>>> Beam does, or would it be best outside of Beam?
>>>>
>>>> Thank you,
>>>> Charles Allen
>>>>
>>>
>>>
>

Reply via email to