aromanenko-dev commented on code in PR #25187:
URL: https://github.com/apache/beam/pull/25187#discussion_r1093166881


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java:
##########
@@ -61,25 +61,25 @@ public class PipelineTranslatorBatch extends 
PipelineTranslator {
   // 
https://github.com/seznam/euphoria/blob/master/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java#L106
 
   static {
-    TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch());
-    TRANSFORM_TRANSLATORS.put(Combine.PerKey.class, new 
CombinePerKeyTranslatorBatch<>());
-    TRANSFORM_TRANSLATORS.put(Combine.Globally.class, new 
CombineGloballyTranslatorBatch<>());
+    TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch(0));

Review Comment:
   How all this and below complexity factors were estimated?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -146,13 +165,37 @@ public String name() {
     public @Nullable Dataset<WindowedValue<T>> dataset() {
       return dataset;
     }
+
+    private boolean isLeaf() {
+      return dependentTransforms.isEmpty();
+    }
+
+    private int usages() {
+      return dependentTransforms.size();
+    }
+
+    private void resetPlanComplexity() {
+      planComplexity = 1;
+    }
+
+    /** Estimate complexity of query plan by multiplying complexities of all 
dependencies. */
+    private float estimatePlanComplexity() {
+      if (planComplexity > 0) {
+        return planComplexity;
+      }
+      float complexity = 1 + complexityFactor;
+      for (TranslationResult<?> res : dependencies) {

Review Comment:
   nit: res -> result



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -129,12 +138,22 @@ public EvaluationContext translate(
    */
   private static final class TranslationResult<T> implements 
EvaluationContext.NamedDataset<T> {
     private final String name;
+    private final float complexityFactor;

Review Comment:
   Default value?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -80,6 +83,12 @@
 public abstract class PipelineTranslator {
   private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTranslator.class);
 
+  // Threshold to limit query plan complexity to avoid unnecessary planning 
overhead. Currently this
+  // is fairly low, Catalyst won't be able to optimize beyond ParDos anyways. 
Until there's
+  // dedicated support for schema transforms, there's little value of allowing 
more complex plans at
+  // this point.
+  private static final int PLAN_COMPLEXITY_THRESHOLD = 6;

Review Comment:
   Why it is 6? Should it be configurable? 



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -129,12 +138,22 @@ public EvaluationContext translate(
    */
   private static final class TranslationResult<T> implements 
EvaluationContext.NamedDataset<T> {
     private final String name;
+    private final float complexityFactor;
+    private float planComplexity = 0;
+
     private @MonotonicNonNull Dataset<WindowedValue<T>> dataset = null;
     private @MonotonicNonNull Broadcast<SideInputValues<T>> sideInputBroadcast 
= null;
+
+    // dependent downstream transforms (if empty this is a leaf)
     private final Set<PTransform<?, ?>> dependentTransforms = new HashSet<>();
+    // upstream dependencies (requires inputs)

Review Comment:
   nit: downstream?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -247,20 +292,37 @@ public <T> Dataset<WindowedValue<T>> 
getDataset(PCollection<T> pCollection) {
     public <T> void putDataset(
         PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset, boolean 
cache) {
       TranslationResult<T> result = getResult(pCollection);
-      if (cache && result.dependentTransforms.size() > 1) {
-        LOG.info("Dataset {} will be cached.", result.name);
-        result.dataset = dataset.persist(storageLevel); // use NONE to disable
-      } else {
-        result.dataset = dataset;
-        if (result.dependentTransforms.isEmpty()) {
-          leaves.add(result);
+      result.dataset = dataset;
+
+      if (!cache && isMemoryOnly) {
+        result.resetPlanComplexity(); // cached as RDD in memory which breaks 
linage
+      } else if (cache && result.usages() > 1) {
+        if (isMemoryOnly) {
+          // Cache as RDD in-memory only, this helps to also break linage of 
complex query plans.
+          LOG.info("Dataset {} will be cached in-memory as RDD for reuse.", 
result.name);
+          result.dataset = sparkSession.createDataset(dataset.rdd().persist(), 
dataset.encoder());
+          result.resetPlanComplexity();
+        } else {
+          LOG.info("Dataset {} will be cached for reuse.", result.name);
+          dataset.persist(storageLevel); // use NONE to disable
         }
       }
+
+      if (result.estimatePlanComplexity() > PLAN_COMPLEXITY_THRESHOLD) {
+        // Break linage of dataset to limit planning overhead for complex 
query plans.
+        LOG.debug("Breaking linage of dataset {} to limit complexity of query 
plan.", result.name);

Review Comment:
   nit: `info` log level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to