Hi,


I’m using Spark 2.4.4 (on EMR)  to try and test the CBO on a partitioned
external Hive table, files are saved as parquet.



I’ve set up the required configuration:

    .config("spark.sql.cbo.enabled","true")\

    .config("spark.sql.cbo.joinreorder.enabled","true")\



After running the analyse command for all columns:

spark.sql(f"""

ANALYZE TABLE mytable PARTITION(year,month,date)

COMPUTE STATISTICS FOR COLUMNS {','.join(spark.table(‘mytable').columns)}

""")

I run the describe extended:

spark.sql("DESCRIBE EXTENDED mytable").show(truncate=False, n=400)



and can only see table-level stats, not column-level


col_name  data_type                                         comment

Statistics   698171162 bytes, 19001020 rows



Running a sample query shows no usage of statistics whatsoever

spark.sql("""

select * from mytable  a, mytable b

where a.col = b.col and a.anothercol = 6566

""").explain(True)



== Parsed Logical Plan ==

'Project [*]

+- 'Filter (('a.col = 'b.col) && ('a.anothercol = 3462))

   +- 'Join Inner

      :- 'SubqueryAlias `a`

      :  +- 'UnresolvedRelation `mytable`

      +- 'SubqueryAlias `b`

         +- 'UnresolvedRelation `mytable`



== Analyzed Logical Plan ==

col: bigint, anothercol: bigint, ... 37 more fields]

+- Filter ((col#477L = col#497L) && (anothercol#479L = cast(3462 as
bigint)))

   +- Join Inner

      :- SubqueryAlias `a`

      :  +- SubqueryAlias `mydb`.`mytable`

      :     +- Relation[col#477L,anothercol#479L...] parquet

      +- SubqueryAlias `b`

         +- SubqueryAlias `mydb`.`mytable`

            +- Relation[col#497L,anothercol#499L,...] parquet



== Optimized Logical Plan ==

Join Inner, (col#477L = col#497L)

:- Filter ((isnotnull(anothercol#479L) && (anothercol#479L = 3462)) &&
isnotnull(col#477L))

:  +- Relation[col#477L,anothercol#479L,...] parquet

+- Filter isnotnull(col#497L)

   +- Relation[col#497L,anothercol#499L,...] parquet



== Physical Plan ==

*(5) SortMergeJoin [col#477L], [col#497L], Inner

:- *(2) Sort [col#477L ASC NULLS FIRST], false, 0

:  +- Exchange hashpartitioning(col#477L, 1000)

:     +- *(1) Project [col#477L, anothercol#479L, ...]

:        +- *(1) Filter ((isnotnull(anothercol#479L) && (anothercol#479L =
3462)) && isnotnull(col#477L))

:           +- *(1) FileScan parquet
mydb.mytable[col#477L,anothercol#479L...] Batched: true, Format: Parquet,
Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91,
PartitionFilters: [], PushedFilters: [IsNotNull(anothercol),
EqualTo(anothercol,3462), IsNotNull(col)], ReadSchema:
struct<col:bigint,anothercol:bigint,...

+- *(4) Sort [col#497L ASC NULLS FIRST], false, 0

   +- Exchange hashpartitioning(col#497L, 1000)

      +- *(3) Project [col#497L, anothercol#499L, ...]

         +- *(3) Filter isnotnull(col#497L)

            +- *(3) FileScan parquet
mydb.mytable[col#497L,anothercol#499L,...] Batched: true, Format: Parquet,
Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91,
PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema:
struct<col:bigint,anothercol:bigint,...



I see the following tickets on JIRA but no helpful information on whether
they’re bugs or not

https://issues.apache.org/jira/browse/SPARK-29335



https://issues.apache.org/jira/browse/SPARK-25185



Is this a bug or is there something I’m missing?



Thanks!

Reply via email to