xuzikun2003 commented on a change in pull request #29725:
URL: https://github.com/apache/spark/pull/29725#discussion_r525862140



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;

Review comment:
       @maropu , @opensky142857, here are the reasons for why we set the 
windowSorterMapMaxSize to be 1 and why we reduce the page size of each sorter.
   
   Each UnsafeExternalRowSorter is using a different memory consumer. Whenever 
you insert the first row into an UnsafeExternalRowSorter, the memory consumer 
of this sorter will allocate a whole page to the sorter. In our perf run of 
TPCDS100TB, the default page size is 64MB. If we insert only a few rows into a 
sorter corresponding to a window, then a lot of memory resources are wasted and 
the non-necessary memory allocation also brings significant performance 
overhead. So that is why we do two things in this PR:
   1. Keep the number of window sorters small
   2. Decrease the page size of each window sorter.
   
   To address this problem, actually we have two directions to go. 
   
   One direction is that we can let these window sorters share the same memory 
consumer. Thus we won't allocate many big pages to which very few rows are 
inserted. But this direction requires a lot of engineer effort to refactor the 
code of UnsafeExternalSorter.
   
   The second direction is that we only keep one window sorter for each 
physical partition.
   
   Here is why we choose the second direction. When we run TPCDS100TB, we are 
not seeing Spark engine is slow in sorting many windows in a physical 
partition. We are seeing Spark engine is slow in sorting a single window in a 
single physical partition (q67 is the case), and the executor is doing a lot of 
unnecessary comparisons on the window partition key. To address the slowness 
that we observe, we follow the second direction to keep only one window sorter 
for each physical partition. And this single window sorter in each physical 
partition does not need to compare the window partition key and thus it runs 
almost 2 times faster.
   
   Perhaps I can rename these parameters to avoid confusion. How do you guys 
think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to