leventov opened a new issue #7087: Online aggregation / Interactive Query 
Session Protocol
URL: https://github.com/apache/incubator-druid/issues/7087
 
 
   This issue discusses two proposals: "Online aggregation" and "Interactive 
Query Session Protocol" because they are very closely related to each other.
   
   ### Online aggregation
   
   Previously the "online aggregation" concept in the context of Druid was 
proposed as ["incremental query 
results"](https://groups.google.com/d/msg/druid-development/X33JTyoTlqg/rGP3P07XDwAJ),
 but to avoid a clash with "incremental" term as used in IncrementalIndexes, 
and because it seems that "online aggregation" term is most often used to refer 
to this concept in scientific papers and elsewhere (possibly beginning from the 
[Online 
aggregation](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.49.8797&rep=rep1&type=pdf)
 paper by JM Hellerstein et al, 1997) it's suggested to stick to "online 
aggregation" from now on.
   
   To facilitate human interactions with analytics and data exploration 
interfaces backed by Druid, and also to [reduce percieved Druid's error 
rates](https://medium.com/@leventov/the-problems-with-druid-at-large-scale-and-high-load-part-1-714d475e84c9)
 online aggregation must be implemented on the level of Druid's Broker nodes. 
See [The case for interactive data exploration accelerators 
(IDEAs)](https://dl.acm.org/citation.cfm?id=2939513) by A Crotty et el, 2016, 
and related research for some data and ideas related to this topic.
   
   To help readers better understand online aggregation, I first describe 
"offline" aggregation, i. e. what is already implemented in Druid:
   
   #### "Offline" (default) aggregation
   Simple HTTP request-response between a Druid's Broker node and a client. 
(Possibly proxied through a Router node).
   
   Broker determines all segments needed to compute results for the query and 
historical nodes on which those segments are loaded (except for segments for 
which results of the current query are found in cache). Broker sends requests 
to all those historicals to execute the query over all those segments. When 
historicals send the results back, Broker aggregates them all and send results 
back to the client (or a Router, if the request is proxied through it). This 
logic is implemented mainly in `CachingClusteredClient` class.
   
   #### Online aggregation
   Works on top of HTTP streaming or WebSockets connection between a Druid's 
Broker node and a client (possibly proxied through a Router node).
   
   Broker determines all segments needed to compute results for the query and 
historicals on which those segments are loaded, but may not send requests to 
historicals to execute the query over all those segments right away. Instead, 
if, for example, there are 10 partitions in all intervals covered by the query, 
broker sends requests to historical to execute the query over 3 segments in 
each interval (not necessarity with partition numbers 0, 1, and 2; they may be 
chosen randomly for each interval in separation). As soon as results for at 
least two segments in each interval arrive to Broker, Broker sends requests to 
historicals to execute the query over the next portion of segments in each 
interval. Broker aggregates incoming segment query results from historicals in 
background. From time to time Broker sends progressively more complete 
aggregation results (*partial aggregation results*) back to the client.
   
   **Fault tolerance:** if some historical failed to return results for some 
segment (or timed out), Broker tries to send this request to another historical 
node on which the segment is loaded. It may also avoid sending more requests to 
that historical node for other segments in the course of this online 
aggregation session.
   
   **The reasoning and the principle behind the throttling of broker -> 
historical requests:** a user interacting with the analytical interface may 
quickly narrow down the query or start executing another, entirely different 
query, not interested in the results of the current query anymore (by closing 
the browser tab, or going to "Home" in the interface). In this case an attempt 
to compute the most precise results with the maximum possible speed (bound by 
the throughput of the Druid cluster) will turn to be a waste of resources.
   
   In the example above, Broker sends requests for 3 segments in each interval 
out of 10. A possible general principle is to not having more than 1 or 2 
outstanding segment query executions on any historical at any time. This is 
because if historicals use HDDs or network-attached disks, and the segments 
don't appear in the page cache (reminder: they are all memory-mapped, but there 
is an idea to [move away from 
that](https://github.com/apache/incubator-druid/issues/4773)) more segment 
query executions will likely anyway queue up inside the historical, contending 
for I/O. So, that "1 or 2" (or more? configurable?) may depend on the recency 
of the interval, because more recent data is more likely to appear in the page 
cache.
   
   **Results format:** along with ordinary aggregation results (on each 
iteration), Broker sends to the client the information of over what fraction of 
total data the current results are aggregated. It may also send estimated error 
/ confidence intervals of the partial aggregation, if it is able to compute 
them for the given aggregation function, and if the user opts to recieve such 
data.
   
   Broker also sends a flag indicating that the partial aggregation results may 
be skewed, e. g. if the partial aggragation contains no data from some 
interval(s) covered by the query (because of availability problems, slowness, 
errors, etc) or if there are disproportionate amounts of rows from different 
intervals it's not possible to adjust them for the aggregation function(s) that 
we compute, or if rows are partitined between segments within intervals based 
on some key rather than randomly.
   
   **When (how often) broker sends partial results back to the client:** it may 
be a combination of the following factors:
    - Time: send something (whatever new results Broker has since the previous 
sending) every X seconds. This is to ensure interactivity and feed the user 
with new information timely. X may increase since the beginning of the query. 
E. g. timeouts may be: 0.5 seconds since the beginning of the query, 1s, 2s, 
4s, 6s, 8s, 12s, 16s, ... A sending is skipped if there is no new segment 
results aggregated on Broker since the previous sending.
    - Readiness thresholds: send partial results to the client when we estimate 
that 90% (95? 99?) rows that should be covered by the query are already 
aggregated. It needs to fire only within the first 0.5 seconds since the 
beginning of the query execution (or whatever the first timeout is), to make 
the analytics interface to be percieved as even faster by the user. It probably 
shouldn't fire sooner than in 0.1 seconds since the beginning of the query 
execution, because it's where a user stops to notice the difference.
   
   **Aggregation:** for simple Sum and Count aggregators, interpolation of 
partial results to projected final values is pretty trivial, if historicals 
send the numbers of rows in segments and (for sum aggregators) the numbers of 
aggregated rows (i. e. Count) along with segment results themselves. For some 
other aggregators approximation of final results by partial results is 
non-trivial and may require using tools such as sketches (FYI @leerho, @edgan8) 
in Broker's memory, although neither the data nor the aggregation requested by 
the user has nothing to do with sketches. This may be futher complicated by the 
requirement to estimate errors / condifence intervals of the partial 
aggregation results (see above).
   
   However, even if during online aggregation approximation / projection of 
final results by partial results is not done at all and partial aggregation 
results are returned to the client as if it was all the data that needs to be 
aggregated, this is still much better than offline aggregation and in fact may 
not be noticeably worse than a sophisticated approximation algorithm from the 
user's perspective, because users may only be interested in the relative 
weights of different rows topN and groupBy results or a general trend in 
timeseries results, not the absolute values. Many visualisations even don't 
show users absolute values and let them only percieve relative weights (pie 
charts, bar charts).
   
   **JSON vs SQL query:** it would be nice if online aggregation was agnostic 
to the query format and work with JSON as well as SQL queries.
   
   ### Interactive Query Session Protocol
   
   Interactive Query Session Protocol works on top of online aggregation. It 
doesn't seem that it provides much benefit without online aggregation 
implemented.
   
   It may be implemented as an actual protocol on top of a single WebSocket 
connection, or just be a loose agreement about the expectations between clients 
and Druid nodes (Brokers, Routers), although each query is a separate HTTP 
Streaming / WebSocket interaction.
   
   The idea is that human interaction with an analytics interface often appears 
to be a series of "drilling-down" queries: each following query is the previous 
query + more dimension filters. As soon as user starts the next query, he stops 
being interested in the previous query to be computed further, but it's likely 
that a user will return to that less selective query soon. There are also 
"sibling" query transitions on the same level of selectivity: e. g. a user 
moves from query with a filter `dim1 = value1` to an otherwise identical query 
with a filter `dim1 = value2`, or `dim1 IN {value1, value2}`, or `dim1 != 
value1`.
   
   #### Interactive Query Session Protocol (IQSP)
   
   All queries within the human interaction session should end up on the same 
Broker. Unless IQSP is implemented as an actual protocol on top of WebSocket, 
user (analytics interface) need to make sure to query the same Broker within 
the session, or a Router node needs to identify a session heuristically or with 
help of some extra headers in requests and route all queries within a session 
to the same Broker.
   
   As soon as the next query in the session arrives, broker stops online 
aggregation of the previous query, but doesn't drop the partial results and the 
query execution state until the end of the session. If a user returns to this 
query later in the session, broker will resume its execution from where it 
stopped.
   
   After certain "sibling" transitions, broker may be able to provide some 
results to the client before querying any historical nodes, by manipulating the 
cached results of previous less selective queries and the previous "sibling" 
query. E. g. `dim1 = value1` -> `dim1 != value1` transition, and `dim1 = 
value1` -> `dim1 IN {value1, value2}`, if `dim1 = value2` already appeared 
earlier in the session.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to