xuzikun2003 commented on a change in pull request #29725:
URL: https://github.com/apache/spark/pull/29725#discussion_r525862140



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;

Review comment:
       @maropu , @opensky142857, here are the reasons for why we set the 
windowSorterMapMaxSize to be 1 and why we reduce the page size of each sorter.
   
   Each UnsafeExternalRowSorter is using a different memory consumer. Whenever 
you insert the first row into an UnsafeExternalRowSorter, the memory consumer 
of this sorter will allocate a whole page to the sorter. In our perf run of 
TPCDS100TB, the default page size is 64MB. If we insert only a few rows into a 
sorter corresponding to a window, then a lot of memory resources are wasted and 
it also brings significant overhead for non-necessary memory allocation. So 
that is why we do two things in this PR:
   1. Keep the number of window sorters small
   2. Decrease the page size of each window sorter.
   
   To address this problem, actually we have two directions to go. 
   
   One direction is that we can let these window sorters share the same memory 
consumer. Thus we won't allocate many big pages to which very few rows are 
inserted.
   
   The second direction is that we only keep one window sorter for each 
physical partition.
   
   Here is why we choose the second direction. When we run TPCDS100TB, we are 
not seeing Spark engine is slow in sorting many windows in a physical 
partition. We are seeing Spark engine is slow in sorting a single window in a 
single physical partition (q67 is the case), and the executor is doing a lot of 
unnecessary comparisons on the window partition key. To address the slowness 
that we observe, we follow the second direction to keep only one window sorter 
for each physical partition. And this single window sorter in each physical 
partition does not need to compare the window partition key.
   
   Perhaps I can rename these parameters to avoid confusion. How do you guys 
think?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {

Review comment:
       Sure, will add.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -124,7 +125,18 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
       if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) 
{
         child
       } else {
-        SortExec(requiredOrdering, global = false, child = child)
+        operator match {
+          case WindowExec(_, partitionSpec, orderSpec, _)
+            if (!partitionSpec.isEmpty && !orderSpec.isEmpty) =>

Review comment:
       Thanks, will fix it.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import org.apache.spark.sql.execution.metric.SQLMetrics

Review comment:
       Both WindowSortExec and SortExec are for sorting execution. So I want to 
keep the word suffix Exec to tell this is for a query execution.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -55,53 +189,64 @@ case class SortExec(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
 
-  private val enableRadixSort = sqlContext.conf.enableRadixSort
+  val enableRadixSort = sqlContext.conf.enableRadixSort
+
+  lazy val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
output)
+  lazy val ordering = RowOrdering.create(sortOrder, output)
+  lazy val sortPrefixExpr = SortPrefix(boundSortExpression)
+
+  // The comparator for comparing prefix
+  lazy val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
+
+  // The generator for prefix
+  lazy val prefixComputer = createPrefixComputer(sortPrefixExpr)
+
+  lazy val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+    SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
+
+  lazy val pageSize = SparkEnv.get.memoryManager.pageSizeBytes

Review comment:
       Sure, I will add it.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside 
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows 
across
+ *                               different windows on a Spark physical 
partition.
+ *                               This sequence of sort orders is obtained from 
a partition
+ *                               key plus a sequence of sort orders inside a 
window
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrderAcrossWindows,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowWindowSorter = {
+    val partitionSpecGrouping = UnsafeProjection.create(partitionSpec, output)
+
+    // The schema of partition key
+    val partitionKeySchema: Seq[Attribute] = output.filter(x => {
+      x.references.subsetOf(AttributeSet(partitionSpec))
+    })
+
+    // Generate the ordering of partition key
+    val orderingOfPartitionKey = RowOrdering.create(
+      sortOrderAcrossWindows diff sortOrderInWindow,
+      partitionKeySchema)
+
+    // No prefix comparator
+    val nullPrefixComparator = new PrefixComparator {
+      override def compare(prefix1: Long, prefix2: Long): Int = 0
+    }
+
+    if (sortOrderInWindow == null || sortOrderInWindow.size == 0) {

Review comment:
       We don't run WindowSortExec when orderingInWindow == null or 
ortOrderInWindow.size == 0. We run the original SortExec if there is no need to 
sort within each group.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside 
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows 
across
+ *                               different windows on a Spark physical 
partition.
+ *                               This sequence of sort orders is obtained from 
a partition
+ *                               key plus a sequence of sort orders inside a 
window
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,

Review comment:
       You are right, we can remove this parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to