vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r958320960


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(false, false);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+    public void buildAndOptimizeTopology(
+        final boolean optimizeTopology, final boolean optimizeSelfJoin) {
 
         mergeDuplicateSourceNodes();
         if (optimizeTopology) {
             LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
             optimizeKTableSourceTopics();
             maybeOptimizeRepartitionOperations();
+            if (optimizeSelfJoin) {

Review Comment:
   Hey @guozhangwang ! What do you mean by the  "two-pass" mechanism? Do you 
mean that one pass is building the plan and the second is optimizing it? What 
is wrong with doing this? 
   
   The complications that I came across are actually not from this but from the 
fact that the logical plan is not so logical but has a lot of physical plan 
information already baked into it. I don't think it is a good idea to optimize 
the plan while building it (in one pass) as some optimizations cannot be 
identified until after the full plan is built. Moreover, optimizing the plan 
while building allows an optimization rule to only consider "local" information 
(up to the point of what the plan currently contains) and not the whole plan 
holistically which contributes to excluding optimizations that could otherwise 
be applied simply because the optimizer doesn't have all the information yet. 
Moreover, it contributes to "race" conditions between optimizations as applying 
one optimization might exclude another when the second could have been more 
beneficial. 
   
   Regarding the optimization in this PR, it is not possible to identify 
whether it is a self-join until we have seen the entire plan. We need to make 
sure that it is the same source in both arguments of the join and that no 
transformation is applied to one argument and not the other. I am afraid, the 
checks that I am doing on whether a self-join is applicable are unavoidable 
whether we do the optimizations on the fly or afterward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to