Taepper commented on code in PR #46926:
URL: https://github.com/apache/arrow/pull/46926#discussion_r2767523089


##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -74,6 +74,120 @@ class SelectKComparator<SortOrder::Descending> {
   }
 };
 
+struct ExtractCounter {
+  int64_t extract_non_null_count;
+  int64_t extract_nan_count;
+  int64_t extract_null_count;
+};
+
+class HeapSorter {
+ public:
+  using HeapPusherFunction =
+      std::function<void(uint64_t*, uint64_t*, uint64_t*, uint64_t*)>;
+
+  HeapSorter(int64_t k, NullPlacement null_placement, MemoryPool* pool)
+      : k_(k), null_placement_(null_placement), pool_(pool) {}
+
+  Result<std::shared_ptr<ArrayData>> HeapSort(HeapPusherFunction heap_pusher,
+                                              NullPartitionResult p,
+                                              NullPartitionResult q) {
+    ExtractCounter counter = ComputeExtractCounter(p, q);
+    return HeapSortInternal(counter, heap_pusher, p, q);
+  }
+
+  ExtractCounter ComputeExtractCounter(NullPartitionResult p, 
NullPartitionResult q) {
+    int64_t extract_non_null_count = 0;
+    int64_t extract_nan_count = 0;
+    int64_t extract_null_count = 0;
+    int64_t non_null_count = q.non_null_count();
+    int64_t nan_count = q.null_count();
+    int64_t null_count = p.null_count();
+    // non-null nan null
+    if (null_placement_ == NullPlacement::AtEnd) {
+      extract_non_null_count = non_null_count <= k_ ? non_null_count : k_;
+      extract_nan_count = extract_non_null_count >= k_
+                              ? 0
+                              : std::min(nan_count, k_ - 
extract_non_null_count);
+      extract_null_count = extract_non_null_count + extract_nan_count >= k_
+                               ? 0
+                               : (k_ - (extract_non_null_count + 
extract_nan_count));
+    } else {  // null nan non-null
+      extract_null_count = null_count <= k_ ? null_count : k_;
+      extract_nan_count =
+          extract_null_count >= k_ ? 0 : std::min(nan_count, k_ - 
extract_null_count);
+      extract_non_null_count = extract_null_count + extract_nan_count >= k_
+                                   ? 0
+                                   : (k_ - (extract_null_count + 
extract_nan_count));
+    }
+    return {extract_non_null_count, extract_nan_count, extract_null_count};
+  }
+
+  Result<std::shared_ptr<ArrayData>> HeapSortInternal(ExtractCounter counter,
+                                                      HeapPusherFunction 
heap_pusher,
+                                                      NullPartitionResult p,
+                                                      NullPartitionResult q) {
+    int64_t out_size = counter.extract_non_null_count + 
counter.extract_nan_count +
+                       counter.extract_null_count;
+    ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(out_size, 
pool_));
+    // [extrat_count....extract_nan_count...extract_null_count]
+    if (null_placement_ == NullPlacement::AtEnd) {
+      if (counter.extract_non_null_count) {
+        auto* out_cbegin = take_indices->template 
GetMutableValues<uint64_t>(1) +
+                           counter.extract_non_null_count - 1;
+        auto kth_begin = std::min(q.non_nulls_begin + k_, q.non_nulls_end);
+        heap_pusher(q.non_nulls_begin, kth_begin, q.non_nulls_end, out_cbegin);
+      }
+
+      if (counter.extract_nan_count) {
+        auto* out_cbegin = take_indices->template 
GetMutableValues<uint64_t>(1) +
+                           counter.extract_non_null_count + 
counter.extract_nan_count - 1;
+        auto kth_begin =
+            std::min(q.nulls_begin + k_ - counter.extract_non_null_count, 
q.nulls_end);
+        heap_pusher(q.nulls_begin, kth_begin, q.nulls_end, out_cbegin);

Review Comment:
   I will rework this implementation to something more efficient. While at 
that, we can make the SelectK be stable for NULLs and NaNs. Should this be 
added to the specification (and respective tests be reintroduced and added)? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to