Hello Tamas Mate, Daniel Becker, Zoltan Borok-Nagy, Peter Rozsa, Impala Public 
Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/20548

to look at the new patch set (#6).

Change subject: IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables
......................................................................

IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables

For Iceberg tables, when joining the data files with the delete files,
both of the current distribution modes (broadcast, partitioned) are
wasteful. The idea is that when we read a row from a delete file it
contains the name of the data file that this particular delete row is
referring to so if we knew where that data file is scheduled we could
directly send that delete file row there.

This patch enhances the scheduler to collect the information about
which data file is scheduled on which host. Since, the scan node for
the data files are on the same host as the Iceberg join node, we can
send the delete files directly to that specific host.

Restriction:
Table sampling couldn't use this enhancement as it won't schedule all
the data files in a table. But a delete file could contain any data
file name, so when the KrpcDataStreamSender distributes the delete file
rows it might not find the corresponding data file in the filename to
hosts mapping and won't know where to send that particular row. Hence,
V2 read optimization is turned off for table sampling.

Functional testing:
 - Re-run full test suite to check for regressions.

Performance testing:
1) Local machine: SELECT COUNT(1) FROM TPCH10_parquet.lineitem
Around 15% of the rows are deleted.
As the table is unpartitioned I got a small number of delete files with
relatively large size.
Query runtime decreased by ~80%

2) Local machine: SELECT COUNT(1) FROM TPCDS10_store_sales
Around 15% of the rows are deleted.
Table is partitioned that results more delete files but smaller in
size.
Query runtime decreased by ~50%

3) 10-node cluster with data stored on S3.
SELECT COUNT(1) FROM a scaled store_sales table having ~8.6B rows and
~15% are deleted.
Here we had 2 scenarios:
  a) Table is written by Impala: The runtime decreased by ~80%. One
     delete file row is sent exactly to one host.
  b) Table is written by Hive: The runtime decreased by ~60%. Here
     apparently the data files are bigger and one data file might be
     spread to multiple scan ranges. As a result one delete file row
     might be sent to multiple hosts. The time difference between the
     a) run is the time spent on sending out more delete file rows.

Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2
---
M be/src/exec/data-sink.h
M be/src/runtime/coordinator-backend-state.cc
M be/src/runtime/coordinator-backend-state.h
M be/src/runtime/fragment-state.cc
M be/src/runtime/fragment-state.h
M be/src/runtime/krpc-data-stream-sender.cc
M be/src/runtime/krpc-data-stream-sender.h
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M common/protobuf/admission_control_service.proto
M common/protobuf/control_service.proto
M common/thrift/Partitions.thrift
M common/thrift/PlanNodes.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/planner/DataPartition.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
M fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
M fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
M fe/src/main/java/org/apache/impala/planner/JoinNode.java
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
M testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
M 
testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
M testdata/workloads/functional-query/queries/QueryTest/set.test
M tests/query_test/test_iceberg.py
31 files changed, 518 insertions(+), 283 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/48/20548/6
--
To view, visit http://gerrit.cloudera.org:8080/20548
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2
Gerrit-Change-Number: 20548
Gerrit-PatchSet: 6
Gerrit-Owner: Gabor Kaszab <gaborkas...@cloudera.com>
Gerrit-Reviewer: Daniel Becker <daniel.bec...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Peter Rozsa <pro...@cloudera.com>
Gerrit-Reviewer: Tamas Mate <tma...@apache.org>
Gerrit-Reviewer: Zoltan Borok-Nagy <borokna...@cloudera.com>

Reply via email to