debajyoti-truefoundry opened a new issue, #16620:
URL: https://github.com/apache/datafusion/issues/16620

   ### Describe the bug
   
   The query filter selects `492435` rows. As there may be duplicates, I need 
to execute a distinct query on a column. Then order by timestamp, and retrieve 
the top 10 rows. In this case, all `492435` rows were unique; there were **no** 
duplicates.
   
   I executed these queries in an identical environment (AWS EC2 M7A). The 
binary is compiled with the following settings and using the native CPU 
features.
   ```toml
   [profile.release]
   opt-level = 3
   codegen-units = 1
   ```
   
   1. `distinct on ("SpanId")`. Approximately **2 seconds** of wall clock 
execution time.
   ```
   SortPreservingMergeExec: [Timestamp@1 ASC NULLS LAST], fetch=10, 
metrics=[output_rows=10, elapsed_compute=56.441µs]
     SortExec: TopK(fetch=10), expr=[Timestamp@1 ASC NULLS LAST], 
preserve_partitioning=[true], metrics=[output_rows=80, 
elapsed_compute=12.32639ms, row_replacements=568]
       ProjectionExec: 
expr=[first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanId)@1 as SpanId, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Timestamp)@2 as Timestamp, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.TraceId)@3 as TraceId, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanName)@4 as SpanName, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ScopeName)@5 as ScopeName, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ServiceName)@6 as ServiceName, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ParentSpanId)@7 as 
ParentSpanId, first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Duration)@8 as 
Duration, first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Tags)@9 as Tags, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanKind)@10 as SpanKind, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.MlRepoId)@11 as MlRepoId, 
first_value(TracingProjectId)@12 as TracingProjectId, 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.StatusCode)@13 as StatusCode, 
first_value(tbl_
 6baa3c6fd52949c18602a6b38294e8d9.TsBucketStart)@14 as TsBucketStart, 
((first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ServiceName)@6 = 
user-service OR first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ServiceName)@6 
= auth-service) AND 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanName)@4 LIKE %login% AND 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Duration)@8 >= 1000000 AND 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Duration)@8 <= 5000000 AND 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.StatusCode)@13 >= 200 AND 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.StatusCode)@13 <= 299) IS NOT 
DISTINCT FROM true as matched], metrics=[output_rows=492435, 
elapsed_compute=1.681826ms]
         AggregateExec: mode=FinalPartitioned, gby=[SpanId@0 as SpanId], 
aggr=[first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Timestamp), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.TraceId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ScopeName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ServiceName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ParentSpanId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Duration), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Tags), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanKind), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.MlRepoId), 
first_value(TracingProjectId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.StatusCode), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.TsBucketStart)], 
metrics=[output_rows=492435, elapsed_compute=3.882494179s, spill_count=0, 
spilled_bytes=0, spilled
 _rows=0, peak_mem_used=1414466147]
           CoalesceBatchesExec: target_batch_size=20000, 
metrics=[output_rows=492435, elapsed_compute=916.376µs]
             RepartitionExec: partitioning=Hash([SpanId@0], 8), 
input_partitions=8, metrics=[fetch_time=3.424462188s, 
repartition_time=75.357473ms, send_time=327.861036ms]
               AggregateExec: mode=Partial, gby=[SpanId@3 as SpanId], 
aggr=[first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Timestamp), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.TraceId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ScopeName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ServiceName), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.ParentSpanId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Duration), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.Tags), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.SpanKind), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.MlRepoId), 
first_value(TracingProjectId), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.StatusCode), 
first_value(tbl_6baa3c6fd52949c18602a6b38294e8d9.TsBucketStart)], 
metrics=[output_rows=492435, elapsed_compute=2.742439769s, spill_count=0, 
spilled_bytes=0, spilled_ro
 ws=0, skipped_aggregation_rows=191476, peak_mem_used=890092194]
                 CoalesceBatchesExec: target_batch_size=20000, 
metrics=[output_rows=492435, elapsed_compute=67.421µs]
                   FilterExec: TraceId@2 = fb09d0c9b49136bb161464b3e32c5083 AND 
ParentSpanId@4 = 662b7388122dfb79 AND Timestamp@0 >= 1744702200000000 AND 
TsBucketStart@1 >= 1745712000, metrics=[output_rows=492435, 
elapsed_compute=3.831311ms]
                     DeltaScan, metrics=[files_pruned=96, files_scanned=205]
                       DataSourceExec: file_groups={8 groups: [[..., ...]}, 
projection=[Timestamp, TsBucketStart, TraceId, SpanId, ParentSpanId, 
ServiceName, SpanName, SpanKind, ScopeName, Duration, StatusCode, Tags, 
MlRepoId, TracingProjectId], file_type=parquet, predicate=TraceId@2 = 
fb09d0c9b49136bb161464b3e32c5083 AND ParentSpanId@4 = 662b7388122dfb79 AND 
Timestamp@0 >= 1744702200000000 AND TsBucketStart@1 >= 1745712000, 
pruning_predicate=TraceId_null_count@2 != row_count@3 AND TraceId_min@0 <= 
fb09d0c9b49136bb161464b3e32c5083 AND fb09d0c9b49136bb161464b3e32c5083 <= 
TraceId_max@1 AND ParentSpanId_null_count@6 != row_count@3 AND 
ParentSpanId_min@4 <= 662b7388122dfb79 AND 662b7388122dfb79 <= 
ParentSpanId_max@5 AND Timestamp_null_count@8 != row_count@3 AND 
Timestamp_max@7 >= 1744702200000000 AND TsBucketStart_null_count@10 != 
row_count@3 AND TsBucketStart_max@9 >= 1745712000, 
required_guarantees=[ParentSpanId in (662b7388122dfb79), TraceId in 
(fb09d0c9b49136bb161464b3e32c5083)], met
 rics=[output_rows=492435, elapsed_compute=8ns, bytes_scanned=369897090, 
file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, 
page_index_rows_matched=5863737, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=12712344, 
pushdown_rows_pruned=5371302, row_groups_matched_bloom_filter=10, 
row_groups_matched_statistics=205, row_groups_pruned_bloom_filter=195, 
row_groups_pruned_statistics=0, bloom_filter_eval_time=11.551132ms, 
metadata_load_time=76.928924ms, page_index_eval_time=310.055µs, 
row_pushdown_eval_time=28.421911ms, statistics_eval_time=4.607552ms, 
time_elapsed_opening=24.111618ms, time_elapsed_processing=622.896471ms, 
time_elapsed_scanning_total=3.532807376s, 
time_elapsed_scanning_until_data=417.093675ms]
   ```
   
   2. Distinct all columns. Approximately **500MS** of wall clock execution 
time. For my use case, this works as if `SpanId` repeats all the other columns 
selected will have the identical value.
   ```
   SortPreservingMergeExec: [Timestamp@1 ASC NULLS LAST], fetch=10, 
metrics=[output_rows=10, elapsed_compute=501.954µs]
     SortExec: TopK(fetch=10), expr=[Timestamp@1 ASC NULLS LAST], 
preserve_partitioning=[true], metrics=[output_rows=80, 
elapsed_compute=12.752748ms, row_replacements=536]
       ProjectionExec: expr=[SpanId@0 as SpanId, Timestamp@1 as Timestamp, 
TraceId@2 as TraceId, SpanName@3 as SpanName, ScopeName@4 as ScopeName, 
ServiceName@5 as ServiceName, ParentSpanId@6 as ParentSpanId, Duration@7 as 
Duration, Tags@8 as Tags, SpanKind@9 as SpanKind, MlRepoId@10 as MlRepoId, 
TracingProjectId@11 as TracingProjectId, StatusCode@12 as StatusCode, 
TsBucketStart@13 as TsBucketStart, ((ServiceName@5 = user-service OR 
ServiceName@5 = auth-service) AND SpanName@3 LIKE %login% AND Duration@7 >= 
1000000 AND Duration@7 <= 5000000 AND StatusCode@12 >= 200 AND StatusCode@12 <= 
299) IS NOT DISTINCT FROM true as matched], metrics=[output_rows=492435, 
elapsed_compute=1.631387ms]
         AggregateExec: mode=FinalPartitioned, gby=[SpanId@0 as SpanId, 
Timestamp@1 as Timestamp, TraceId@2 as TraceId, SpanName@3 as SpanName, 
ScopeName@4 as ScopeName, ServiceName@5 as ServiceName, ParentSpanId@6 as 
ParentSpanId, Duration@7 as Duration, Tags@8 as Tags, SpanKind@9 as SpanKind, 
MlRepoId@10 as MlRepoId, TracingProjectId@11 as TracingProjectId, StatusCode@12 
as StatusCode, TsBucketStart@13 as TsBucketStart], aggr=[], 
ordering_mode=PartiallySorted([2, 6]), metrics=[output_rows=492435, 
elapsed_compute=369.067634ms, spill_count=0, spilled_bytes=0, spilled_rows=0, 
peak_mem_used=237437570]
           CoalesceBatchesExec: target_batch_size=20000, 
metrics=[output_rows=492435, elapsed_compute=385.467µs]
             RepartitionExec: partitioning=Hash([SpanId@0, Timestamp@1, 
TraceId@2, SpanName@3, ScopeName@4, ServiceName@5, ParentSpanId@6, Duration@7, 
Tags@8, SpanKind@9, MlRepoId@10, TracingProjectId@11, StatusCode@12, 
TsBucketStart@13], 8), input_partitions=8, metrics=[fetch_time=1.06138683s, 
repartition_time=71.350859ms, send_time=24.35597ms]
               AggregateExec: mode=Partial, gby=[SpanId@0 as SpanId, 
Timestamp@1 as Timestamp, TraceId@2 as TraceId, SpanName@3 as SpanName, 
ScopeName@4 as ScopeName, ServiceName@5 as ServiceName, ParentSpanId@6 as 
ParentSpanId, Duration@7 as Duration, Tags@8 as Tags, SpanKind@9 as SpanKind, 
MlRepoId@10 as MlRepoId, TracingProjectId@11 as TracingProjectId, StatusCode@12 
as StatusCode, TsBucketStart@13 as TsBucketStart], aggr=[], 
ordering_mode=PartiallySorted([2, 6]), metrics=[output_rows=492435, 
elapsed_compute=409.064966ms, spill_count=0, spilled_bytes=0, spilled_rows=0, 
peak_mem_used=206203683]
                 ProjectionExec: expr=[SpanId@3 as SpanId, Timestamp@0 as 
Timestamp, TraceId@2 as TraceId, SpanName@6 as SpanName, ScopeName@8 as 
ScopeName, ServiceName@5 as ServiceName, ParentSpanId@4 as ParentSpanId, 
Duration@9 as Duration, Tags@11 as Tags, SpanKind@7 as SpanKind, MlRepoId@12 as 
MlRepoId, CAST(TracingProjectId@13 AS Utf8) as TracingProjectId, StatusCode@10 
as StatusCode, TsBucketStart@1 as TsBucketStart], metrics=[output_rows=492435, 
elapsed_compute=4.455516ms]
                   CoalesceBatchesExec: target_batch_size=20000, 
metrics=[output_rows=492435, elapsed_compute=57µs]
                     FilterExec: TraceId@2 = fb09d0c9b49136bb161464b3e32c5083 
AND ParentSpanId@4 = 662b7388122dfb79 AND Timestamp@0 >= 1744702200000000 AND 
TsBucketStart@1 >= 1745712000, metrics=[output_rows=492435, 
elapsed_compute=3.792731ms]
                       DeltaScan, metrics=[files_pruned=96, files_scanned=205]
                         DataSourceExec: file_groups={8 groups: [[ ...], ...]}, 
projection=[Timestamp, TsBucketStart, TraceId, SpanId, ParentSpanId, 
ServiceName, SpanName, SpanKind, ScopeName, Duration, StatusCode, Tags, 
MlRepoId, TracingProjectId], file_type=parquet, predicate=TraceId@2 = 
fb09d0c9b49136bb161464b3e32c5083 AND ParentSpanId@4 = 662b7388122dfb79 AND 
Timestamp@0 >= 1744702200000000 AND TsBucketStart@1 >= 1745712000, 
pruning_predicate=TraceId_null_count@2 != row_count@3 AND TraceId_min@0 <= 
fb09d0c9b49136bb161464b3e32c5083 AND fb09d0c9b49136bb161464b3e32c5083 <= 
TraceId_max@1 AND ParentSpanId_null_count@6 != row_count@3 AND 
ParentSpanId_min@4 <= 662b7388122dfb79 AND 662b7388122dfb79 <= 
ParentSpanId_max@5 AND Timestamp_null_count@8 != row_count@3 AND 
Timestamp_max@7 >= 1744702200000000 AND TsBucketStart_null_count@10 != 
row_count@3 AND TsBucketStart_max@9 >= 1745712000, 
required_guarantees=[ParentSpanId in (662b7388122dfb79), TraceId in 
(fb09d0c9b49136bb161464b3e32c5083)],
  metrics=[output_rows=492435, elapsed_compute=8ns, bytes_scanned=369885282, 
file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, 
page_index_rows_matched=5863737, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=12712344, 
pushdown_rows_pruned=5371302, row_groups_matched_bloom_filter=10, 
row_groups_matched_statistics=205, row_groups_pruned_bloom_filter=195, 
row_groups_pruned_statistics=0, bloom_filter_eval_time=11.322584ms, 
metadata_load_time=83.355586ms, page_index_eval_time=336.416µs, 
row_pushdown_eval_time=28.880893ms, statistics_eval_time=4.498338ms, 
time_elapsed_opening=30.859506ms, time_elapsed_processing=629.285022ms, 
time_elapsed_scanning_total=785.298422ms, 
time_elapsed_scanning_until_data=417.603661ms]
   ```
   
   3. No Distinct. Approximately **300MS** of wall clock execution time.
   ```
   SortPreservingMergeExec: [Timestamp@1 ASC NULLS LAST], fetch=10, 
metrics=[output_rows=10, elapsed_compute=21.35µs]
     SortExec: TopK(fetch=10), expr=[Timestamp@1 ASC NULLS LAST], 
preserve_partitioning=[true], metrics=[output_rows=30, 
elapsed_compute=11.579758ms, row_replacements=161]
       ProjectionExec: expr=[SpanId@3 as SpanId, Timestamp@0 as Timestamp, 
TraceId@2 as TraceId, SpanName@6 as SpanName, ScopeName@8 as ScopeName, 
ServiceName@5 as ServiceName, ParentSpanId@4 as ParentSpanId, Duration@9 as 
Duration, Tags@11 as Tags, SpanKind@7 as SpanKind, MlRepoId@12 as MlRepoId, 
CAST(TracingProjectId@13 AS Utf8) as TracingProjectId, StatusCode@10 as 
StatusCode, TsBucketStart@1 as TsBucketStart, ((ServiceName@5 = user-service OR 
ServiceName@5 = auth-service) AND SpanName@6 LIKE %login% AND Duration@9 >= 
1000000 AND Duration@9 <= 5000000 AND StatusCode@10 >= 200 AND StatusCode@10 <= 
299) IS NOT DISTINCT FROM true as matched], metrics=[output_rows=492435, 
elapsed_compute=5.709997ms]
         CoalesceBatchesExec: target_batch_size=20000, 
metrics=[output_rows=492435, elapsed_compute=46.641µs]
           FilterExec: TraceId@2 = fb09d0c9b49136bb161464b3e32c5083 AND 
ParentSpanId@4 = 662b7388122dfb79 AND Timestamp@0 >= 1744702200000000 AND 
TsBucketStart@1 >= 1745712000, metrics=[output_rows=492435, 
elapsed_compute=3.539565ms]
             DeltaScan, metrics=[files_pruned=96, files_scanned=205]
               DataSourceExec: file_groups={8 groups: [[...], ...]}, 
projection=[Timestamp, TsBucketStart, TraceId, SpanId, ParentSpanId, 
ServiceName, SpanName, SpanKind, ScopeName, Duration, StatusCode, Tags, 
MlRepoId, TracingProjectId], file_type=parquet, predicate=TraceId@2 = 
fb09d0c9b49136bb161464b3e32c5083 AND ParentSpanId@4 = 662b7388122dfb79 AND 
Timestamp@0 >= 1744702200000000 AND TsBucketStart@1 >= 1745712000, 
pruning_predicate=TraceId_null_count@2 != row_count@3 AND TraceId_min@0 <= 
fb09d0c9b49136bb161464b3e32c5083 AND fb09d0c9b49136bb161464b3e32c5083 <= 
TraceId_max@1 AND ParentSpanId_null_count@6 != row_count@3 AND 
ParentSpanId_min@4 <= 662b7388122dfb79 AND 662b7388122dfb79 <= 
ParentSpanId_max@5 AND Timestamp_null_count@8 != row_count@3 AND 
Timestamp_max@7 >= 1744702200000000 AND TsBucketStart_null_count@10 != 
row_count@3 AND TsBucketStart_max@9 >= 1745712000, 
required_guarantees=[ParentSpanId in (662b7388122dfb79), TraceId in 
(fb09d0c9b49136bb161464b3e32c5083)], metrics=[o
 utput_rows=492435, elapsed_compute=8ns, bytes_scanned=369891545, 
file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, 
page_index_rows_matched=5863737, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=12712344, 
pushdown_rows_pruned=5371302, row_groups_matched_bloom_filter=10, 
row_groups_matched_statistics=205, row_groups_pruned_bloom_filter=195, 
row_groups_pruned_statistics=0, bloom_filter_eval_time=11.370524ms, 
metadata_load_time=61.64128ms, page_index_eval_time=332.784µs, 
row_pushdown_eval_time=28.634729ms, statistics_eval_time=4.530266ms, 
time_elapsed_opening=7.874004ms, time_elapsed_processing=610.976289ms, 
time_elapsed_scanning_total=628.786969ms, 
time_elapsed_scanning_until_data=427.639613ms]
   ```
   
   I would like to understand what I can do to make `distinct on (columns)` 
faster.
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to