[ 
https://issues.apache.org/jira/browse/KAFKA-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6034:
---------------------------------
    Description: 
Today our DSL to topology translation mechanism is operator-by-operator, and 
hence is sub-optimal in the resulted topology. Some known issues:

1. The repartition topics for aggregations and joins may be duplicated, i.e. 
containing exactly the same data as other topics.
2. state store's changelog topics are duplicates of other repartition / to / 
through topics.

We'd better have improve our DSL translation with a global optimization goal 
for the number of internal topics as well as state stores, i.e. "logical plan 
optimization".



-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

h2. Addendum: Possible Optimization Techniques from DSMS

that we can leverage from the DSMS literature 
(http://hirzels.com/martin/papers/debs13-tutorial.pdf, 
http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf)

1. Operator Separation / Fission: break a operator into consecutive 
sub-operators, or replicate an operator in parallel in the topology. Both of 
them is for better leveraging parallel processing power.

Example: stream.map(“mapper”) => stream.map(“mapper1”).map(“mapper2”)
                 where “mapper” = “mapper1” * “mapper2”

* In Streams, consider the scenario where some sub-topology may have different 
parallelism requirements, such that mapper1 may need N worker while mapper2 may 
need M workers where M >> N.

2. Operator Fusion / Scheduling: we can either decide to pass an incoming tuple 
through the whole topology before processing the next tuple (i.e. Depth-First, 
or Item Model, or FIFO scheduling), or block until we have completed processing 
all the queued tuples at one operator and collects all its intermediate outputs 
before moving on to the next operator (i.e. Breath-First, or Block Model, or 
Greedy scheduling, note that this would require some buffering).
 
Example: if we have a topology with three consecutive operators O1 -> O2 -> O3, 
where each operator will take one tuple and generate 2 outputs, and if we only 
have one tuple queued for O1 from the source, with Depth-First we will execute 
the following ordering: O1, O2, O3, O3, O2, O3, O3; while with Breadth-First we 
will execute O1, O2, O2, O3, O3, O3, O3; and we can do something in between of 
these two mechanisms (i.e. dynamical scheduling), and if there are new coming 
tuples for O1 during the processing, we may decide to come back to O1 (i.e. 
give it a higher priority).
 
The main motivation is to reduce memory footprint in the queues for operators, 
and also if one has multi-threads to balance loads. A related optimization 
technique is called "load shedding" with the scheduling, i.e. when the 
processing capacity cannot meet incoming rates, decide whether and how much to 
drop records at the head of the topology while scheduling current queued 
records.

* In Streams since we use Kafka as a persistent "buffer" and control the queued 
size with consumer polling frequency, we always use Depth-First to reduce 
intermediate result memory footprint, and this optimization category does not 
apply. However, consider the case when one operation of the sub-topology is IO 
heavy or needs to access remote data, then instead of applying some async 
processing policy we could break the operation into its own sub-topology with 
threads that can suspend / resume with those remote calls.

3. Non-Stateful Operator Reordering (or "Selection Pushing Forward"): we want 
to push the operators with low selectivity and low cost to the front of the 
topology. For example, if we have a topology of O1 -> O2, where O1's 
selectivity is 1.2 and cost (i.e. time to process one tuple) is 2, and O2's 
selectivity is 0.1 and cost is 1, we would like to re-order it to O2 -> O1.

This is one of the most common low-hanging fruit in stream optimizations.
Example: if you have a Source -> Map(“mapper”) -> Filter(“predicate”) topology, 
and would almost always better to reorder to Source -> Filter(“predicate1”) -> 
Map(“mapper1”) if you can translate the original mapper and predicate to the 
new predicate and mapper.

* In Streams since runtime is schema agnostic it is tricky to find the matching 
predicate1 / mapper1.
 
4. Redundancy Elimination: For example, if we have a topology of Branch -> [O1 
-> O2, O1 -> O3], we may want to re-write it to O1-> Branch -> [O2, O3].
 
5. Multi-Join Operator Reordering: with multi-joins like 
"stream1.join(stream2).join(stream3)", we can try to re-order the join ordering 
based on join selectivity and cost, and also whether or not materialize 
intermediate results, for example, whether store the results of 
"stream1.join(stream2)" in a separate state store or always process the same 
join operation; a similar optimization is call "join operator sharing", for 
example, if we have one sub-topology with "stream1.join(stream2).join(stream3)" 
and another sub-topology with "stream1.join(stream2)", in this case, we may 
want to not materialize the results of "stream1.join(stream2)" in both topology.

* This may be related to KIP-150.


  was:
Today our DSL to topology translation mechanism is operator-by-operator, and 
hence is sub-optimal in the resulted topology. Some known issues:

1. The repartition topics for aggregations and joins may be duplicated, i.e. 
containing exactly the same data as other topics.
2. state store's changelog topics are duplicates of other repartition / to / 
through topics.

We'd better have improve our DSL translation with a global optimization goal 
for the number of internal topics as well as state stores, i.e. "logical plan 
optimization".


> Streams DSL to Processor Topology Translation Improvements
> ----------------------------------------------------------
>
>                 Key: KAFKA-6034
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6034
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: optimization
>
> Today our DSL to topology translation mechanism is operator-by-operator, and 
> hence is sub-optimal in the resulted topology. Some known issues:
> 1. The repartition topics for aggregations and joins may be duplicated, i.e. 
> containing exactly the same data as other topics.
> 2. state store's changelog topics are duplicates of other repartition / to / 
> through topics.
> We'd better have improve our DSL translation with a global optimization goal 
> for the number of internal topics as well as state stores, i.e. "logical plan 
> optimization".
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> h2. Addendum: Possible Optimization Techniques from DSMS
> that we can leverage from the DSMS literature 
> (http://hirzels.com/martin/papers/debs13-tutorial.pdf, 
> http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf)
> 1. Operator Separation / Fission: break a operator into consecutive 
> sub-operators, or replicate an operator in parallel in the topology. Both of 
> them is for better leveraging parallel processing power.
> Example: stream.map(“mapper”) => stream.map(“mapper1”).map(“mapper2”)
>                  where “mapper” = “mapper1” * “mapper2”
> * In Streams, consider the scenario where some sub-topology may have 
> different parallelism requirements, such that mapper1 may need N worker while 
> mapper2 may need M workers where M >> N.
> 2. Operator Fusion / Scheduling: we can either decide to pass an incoming 
> tuple through the whole topology before processing the next tuple (i.e. 
> Depth-First, or Item Model, or FIFO scheduling), or block until we have 
> completed processing all the queued tuples at one operator and collects all 
> its intermediate outputs before moving on to the next operator (i.e. 
> Breath-First, or Block Model, or Greedy scheduling, note that this would 
> require some buffering).
>  
> Example: if we have a topology with three consecutive operators O1 -> O2 -> 
> O3, where each operator will take one tuple and generate 2 outputs, and if we 
> only have one tuple queued for O1 from the source, with Depth-First we will 
> execute the following ordering: O1, O2, O3, O3, O2, O3, O3; while with 
> Breadth-First we will execute O1, O2, O2, O3, O3, O3, O3; and we can do 
> something in between of these two mechanisms (i.e. dynamical scheduling), and 
> if there are new coming tuples for O1 during the processing, we may decide to 
> come back to O1 (i.e. give it a higher priority).
>  
> The main motivation is to reduce memory footprint in the queues for 
> operators, and also if one has multi-threads to balance loads. A related 
> optimization technique is called "load shedding" with the scheduling, i.e. 
> when the processing capacity cannot meet incoming rates, decide whether and 
> how much to drop records at the head of the topology while scheduling current 
> queued records.
> * In Streams since we use Kafka as a persistent "buffer" and control the 
> queued size with consumer polling frequency, we always use Depth-First to 
> reduce intermediate result memory footprint, and this optimization category 
> does not apply. However, consider the case when one operation of the 
> sub-topology is IO heavy or needs to access remote data, then instead of 
> applying some async processing policy we could break the operation into its 
> own sub-topology with threads that can suspend / resume with those remote 
> calls.
> 3. Non-Stateful Operator Reordering (or "Selection Pushing Forward"): we want 
> to push the operators with low selectivity and low cost to the front of the 
> topology. For example, if we have a topology of O1 -> O2, where O1's 
> selectivity is 1.2 and cost (i.e. time to process one tuple) is 2, and O2's 
> selectivity is 0.1 and cost is 1, we would like to re-order it to O2 -> O1.
> This is one of the most common low-hanging fruit in stream optimizations.
> Example: if you have a Source -> Map(“mapper”) -> Filter(“predicate”) 
> topology, and would almost always better to reorder to Source -> 
> Filter(“predicate1”) -> Map(“mapper1”) if you can translate the original 
> mapper and predicate to the new predicate and mapper.
> * In Streams since runtime is schema agnostic it is tricky to find the 
> matching predicate1 / mapper1.
>  
> 4. Redundancy Elimination: For example, if we have a topology of Branch -> 
> [O1 -> O2, O1 -> O3], we may want to re-write it to O1-> Branch -> [O2, O3].
>  
> 5. Multi-Join Operator Reordering: with multi-joins like 
> "stream1.join(stream2).join(stream3)", we can try to re-order the join 
> ordering based on join selectivity and cost, and also whether or not 
> materialize intermediate results, for example, whether store the results of 
> "stream1.join(stream2)" in a separate state store or always process the same 
> join operation; a similar optimization is call "join operator sharing", for 
> example, if we have one sub-topology with 
> "stream1.join(stream2).join(stream3)" and another sub-topology with 
> "stream1.join(stream2)", in this case, we may want to not materialize the 
> results of "stream1.join(stream2)" in both topology.
> * This may be related to KIP-150.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to