[ https://issues.apache.org/jira/browse/SPARK-40793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chenminghua updated SPARK-40793: -------------------------------- Affects Version/s: 3.4.0 > Row-level Runtime Filtering cannot be enabled when externalTable has no stats > ----------------------------------------------------------------------------- > > Key: SPARK-40793 > URL: https://issues.apache.org/jira/browse/SPARK-40793 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.3.0, 3.4.0 > Environment: > {panel:title=Use tpcds to generate test data} > test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d > /data/project/tpcds-kit/tpcds-kit/tools -s 500 -l > hdfs://hadoop01:8020/user/apple/tpcds -f parquet > {panel} > > {panel:title=Use spark-sql-perf's TPCDSTables class to create external tables > to perform tpcds q24a tests like the following} > import com.databricks.spark.sql.perf.tpcds.TPCDSTables; > .................... > String dsdgenDir = "/data/project/tpcds-kit/tpcds-kit/tools"; > String scaleFactor = "500"; > String format = "parquet"; > String rootDir = "hdfs://hadoop01:8020/user/apple/tpcds"; > String databaseName = "tpcds_500g"; > String q24a = ............... > TPCDSTables tables = new TPCDSTables(sqlContext, dsdgenDir, scaleFactor, > false, false); > tables.createExternalTables(rootDir, format, databaseName, true, true, > ""); > //tables.analyzeTables(databaseName, true, ""); > Dataset<Row> queryResult = spark.sql(testSql); > queryResult.show(); > {panel} > {panel:title=Spark Sql generates the following execution plan that did not > use Row-level Runtime Filtering} > == Physical Plan == > AdaptiveSparkPlan (121) > +- == Final Plan == > * SerializeFromObject (75) > +- MapPartitions (74) > +- DeserializeToObject (73) > +- * Sort (72) > +- AQEShuffleRead (71) > +- ShuffleQueryStage (70), Statistics(sizeInBytes=119.6 KiB, > rowCount=1.52E+3) > +- Exchange (69) > +- * Filter (68) > +- * HashAggregate (67) > +- AQEShuffleRead (66) > +- ShuffleQueryStage (65), > Statistics(sizeInBytes=139.7 KiB, rowCount=1.62E+3) > +- Exchange (64) > +- * HashAggregate (63) > +- * HashAggregate (62) > +- AQEShuffleRead (61) > +- ShuffleQueryStage (60), > Statistics(sizeInBytes=1425.0 KiB, rowCount=9.15E+3) > +- Exchange (59) > +- * HashAggregate (58) > +- * Project (57) > +- * SortMergeJoin > Inner (56) > :- * Sort (48) > : +- > AQEShuffleRead (47) > : +- > ShuffleQueryStage (46), Statistics(sizeInBytes=60.9 MiB, rowCount=3.54E+5) > : +- > Exchange (45) > : +- * > Project (44) > : +- > * SortMergeJoin Inner (43) > : > :- * Sort (35) > : > : +- AQEShuffleRead (34) > : > : +- ShuffleQueryStage (33), Statistics(sizeInBytes=47.7 MiB, > rowCount=3.67E+5) > : > : +- Exchange (32) > : > : +- * Project (31) > : > : +- * BroadcastHashJoin Inner BuildRight (30) > : > : :- * Project (24) > : > : : +- * BroadcastHashJoin Inner BuildRight (23) > : > : : :- * Project (16) > : > : : : +- * SortMergeJoin Inner (15) > : > : : : :- * Sort (7) > : > : : : : +- ShuffleQueryStage (6), > Statistics(sizeInBytes=60.0 GiB, rowCount=1.34E+9) > : > : : : : +- Exchange (5) > : > : : : : +- * Project (4) > {color:#de350b} : > : : : : +- * Filter > (3){color} > : > : : : : +- * ColumnarToRow (2) > : > : : : : +- Scan parquet > tpcds_500.store_sales (1) > : > : : : +- * Sort (14) > : > : : : +- ShuffleQueryStage (13), > Statistics(sizeInBytes=3.2 GiB, rowCount=1.44E+8) > : > : : : +- Exchange (12) > : > : : : +- * Project (11) > : > : : : +- * Filter (10) > : > : : : +- * ColumnarToRow (9) > : > : : : +- Scan parquet > tpcds_500.store_returns (8) > : > : : +- BroadcastQueryStage (22), > Statistics(sizeInBytes=1030.3 KiB, rowCount=100) > : > : : +- BroadcastExchange (21) > : > : : +- * Project (20) > : > : : +- * Filter (19) > : > : : +- * ColumnarToRow (18) > : > : : +- Scan parquet tpcds_500.store (17) > : > : +- BroadcastQueryStage (29), Statistics(sizeInBytes=1280.0 > KiB, rowCount=5.97E+3) > : > : +- BroadcastExchange (28) > : > : +- * Filter (27) > : > : +- * ColumnarToRow (26) > : > : +- Scan parquet tpcds_500.item (25) > : > +- * Sort (42) > : > +- AQEShuffleRead (41) > : > +- ShuffleQueryStage (40), Statistics(sizeInBytes=438.8 MiB, > rowCount=6.76E+6) > : > +- Exchange (39) > : > +- * Filter (38) > : > +- * ColumnarToRow (37) > : > +- Scan parquet tpcds_500.customer (36) > +- * Sort (55) > +- > AQEShuffleRead (54) > +- > ShuffleQueryStage (53), Statistics(sizeInBytes=203.8 MiB, rowCount=3.34E+6) > +- > Exchange (52) > +- * > Filter (51) > +- > * ColumnarToRow (50) > > +- Scan parquet tpcds_500.customer_address (49) > > {color:#de350b}(3) Filter [codegen id : 1]{color} > {color:#de350b}Input [6]: [ss_item_sk#62, ss_customer_sk#63, ss_store_sk#67, > ss_ticket_number#69L, ss_net_paid#80, ss_sold_date_sk#83]{color} > {color:#de350b}Condition : (((isnotnull(ss_ticket_number#69L) AND > isnotnull(ss_item_sk#62)) AND isnotnull(ss_store_sk#67)) AND > isnotnull(ss_customer_sk#63)){color} > {panel} > > > > > Reporter: chenminghua > Priority: Major > Labels: patch > > When using external tables, Row-level Runtime Filtering cannot be enabled > anyway without performing analysis on the table to generate statistics. In > actual use, external tables often do not have statistical data, but it is > also hoped that the execution efficiency can be improved through Row-level > Runtime Filtering. The reason why Row-level Runtime Filtering cannot be > enabled is: 'InjectRuntimeFilter' calls the 'satisfyByteSizeRequirement' > method to determine whether the application side plan's aggregated scan size > meets the requirements, and because there is no statistical data, the > application side plan's aggregated scan size is equal to 0, which cannot > satisfy the requirement to enable Row -level Runtime Filtering requirements. > In order to enable Row-level Runtime Filtering even when the external table > has no statistics, add the RUNTIME_FILTER__ENABLED_WHEN_NO_STATS parameter. > When RUNTIME_FILTER__ENABLED_WHEN_NO_STATS is configured to true and the > external table has no statistics, the 'satisfyByteSizeRequirement' method > returns true so that Row-level Runtime Filtering may be enabled . -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org