Based on this description, it seems like druid sinks have to be fault
tolerant. I was hoping that they didn't need to be and as soon as they
wrote some information to druid then you would be able to crash and druid
only used the sinks as an optimization for unindexed data.

In your case it seems like you'll want to have a StatefulDoFn which writes
information out to to state and druid at the same time. Future calls to the
StatefulDoFn would be responsible for garbage collecting information from
state based upon whether that data has been indexed.
You could map all the data you want to write on to a fixed key space like
[0, 100). Each druid sink would cache their writes in memory and into state
(to handle a machine going down or being migrated) and write to druid.
Whenever it gets a piece of work in the range [0, 100) it would need to:
1) If not owner of the key:
    a) De-register any prior druid sink that owned that key
    b) Register it self as the owner of that key
3) Garbage collect anything in state that is now indexed in druid
4) Add the new data to state
5) Output the data to druid

Not being the owner of a key should be rare. Also, it would be worthwhile
to read https://beam.apache.org/blog/2017/02/13/stateful-processing.html

On Wed, Feb 7, 2018 at 8:44 PM, Charles Allen <charles.al...@snap.com>
wrote:

> Thanks for the insightful response.
>
> > Can druid handle sinks being created and removed dynamically and at what
> rate?
>
> From the external data access side (the things that need to access Beam
> runners), announcement is handled by zookeeper, so changes that happen on a
> scale larger than typical zookeeper timeout level (maybe order tens of
> seconds) are expected. There are probably ways to do reverse proxies if
> things go up and down faster. But any launch/terminate speeds that are
> comparable to network hiccups or unlucky JVM GC timings are going to be
> problematic regardless.
>
> > What are the expectations around this partial data that has been
> collected by a sink?
>
> That is a core area I'm trying to figure out how to handle. Druid indexing
> right now works by making "small" batches of events (usually high tens of
> thousands to low hundreds of thousands of events) and doing incremental
> indexing to get the data into a read-optimized form, then doing a final big
> merge of a bunch of these incremental indicies. The key thing is this: once
> data is considered queryable, it needs a chain of custody until loaded up
> on "historical" nodes (or some other new type of node) in Druid. What this
> means is that a runner that has query-able state needs to stay up until
> that state is acknowledged available by something else. If a runner fails
> and one (or more) other runner starts to re-accumulate that state, it is
> fine. What is forbidden is for the runner to have queryable state, then
> just exit under "normal" conditions without waiting for the state to be
> accessible somewhere else. This can cause backpressure in the runners
> whenever handoff is delayed.
>
> I *THINK* such a handoff should be able to be handled in a reasonable
> commit workflow. Eventually something needs to merge a bunch of these
> incremental things together into larger chunks, and such a
> data-optimization pass is not clear if it would work on the same framework,
> or have to be a different one.
>
> Thoughts?
> Charles Allen
>
>
> On Wed, Feb 7, 2018 at 9:37 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> There are no existing sinks which are accessible from outside of Apache
>> Beam.
>>
>> Apache Beam does work by processing "bundles" (a unit of work). Each of
>> these are executed and results are committed back into a Runner (such as
>> Flink/Spark/...). The lifetime of an individual instance of a sink is bound
>> to the bundle being processed and can be as small as a few milliseconds.
>> There are some sinks which cache stuff within the JVM (like connections)
>> but the caching is best effort and if the machine was to go down (crash,
>> autoscaling reduces number of workers, ...) and if it ever comes back the
>> cached state was either unimportant or easily recoverable.
>>
>> Can druid handle sinks being created and removed dynamically and at what
>> rate?
>> What are the expectations around this partial data that has been
>> collected by a sink?
>>
>>
>> On Wed, Feb 7, 2018 at 7:31 AM, Charles Allen <charles.al...@snap.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I work closely with druid.io
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=Mnf63IL1qJt71TQ22vHbCBAMjSHgIPQg71o7bUkSRsw&e=>
>>> and one of the main pain points for any Druid deployment is handling the
>>> real-time streaming component. What I, personally, would *like* to have is
>>> the streaming orchestration and streaming state handled by a runner which
>>> specializes in such things, and allow Druid to focus on the lightning fast
>>> ad-hoc query side.
>>>
>>> A natural contender for such a setup would be a Beam based solution with
>>> a Druid segment Sink. The trouble we are having pursuing such a setup is
>>> two fold:
>>>
>>>    1.  Many Druid setups run lambda-style pipes to backfill late or
>>>    wrong data, so the sink needs to call out to the Druid cluster for data
>>>    version locking and orchestration. Access must be available from the sink
>>>    to the Druid Overlord and/or Coordinator, or potentially some other
>>>    task-specific jvm in the cluster.
>>>    2. Druid segments are queryable while they are being built. This
>>>    means that the Druid cluster (Broker specifically) must be able to
>>>    discover and issue *RPC queries* against the Sink on the partial
>>>    data it has accumulated. This puts an extra dynamic load on the Sink jvm,
>>>    but in my own experience that extra load is small compared to the load of
>>>    indexing the incoming data.
>>>
>>> The key desire to use a stream-native framework is that the Druid
>>> MiddleManager/Peon setup (the streaming task infrastructure for Druid) has
>>> problems recovering from failure, recovering from upgrade easily, handling
>>> late data well, and dynamic horizontal scaling. The Kafka Indexing
>>> Extension
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__druid.io_docs_latest_development_extensions-2Dcore_kafka-2Dingestion.html&d=DwMFaQ&c=ncDTmphkJTvjIDPh0hpF_w&r=HrLGT1qWNhseJBMYABL0GFSZESht5gBoLejor3SqMSo&m=RfzQSR6AU_Kv9ZMfTOpxwcU8ZmBmA0C2qLVOf8mjBFo&s=AlpnfkhLS3u0I-XIe6pzvtds6_n1MkIMfVl0WrI8v6M&e=>
>>> handles some of these things, but still doesn't quite operate like a
>>> stream-native solution, and is only available for Kafka.
>>>
>>> What is not clear to me is if such a feature set would be in the
>>> intended scope of Beam as opposed to having some other streaming data
>>> catcher service that Beam simply feeds into (aka, a Tranquility
>>> <https://github.com/druid-io/tranquility> sink). Having a customizable,
>>> accessible and stateful sink with good commit semantics seems like it
>>> should be something Beam could support natively, but the "accessible" part
>>> is where I need some guidance on if that is in scope for Beam.
>>>
>>> Can the dev list provide some insight to if there are any other Sinks
>>> that strive to be accessible at run-time from non-Beam components?
>>>
>>> Also, is such a use case something that is desired to be a part of what
>>> Beam does, or would it be best outside of Beam?
>>>
>>> Thank you,
>>> Charles Allen
>>>
>>
>>

Reply via email to