Hello folks,
I have been debugging this one for a bit now, so wanted to share my findings,
and ask for suggestions.
I have a table with PartitionSpec like so:
[
batchId: identity(21)
c_date: identity(22)
]
where c_date is a DATE.
How many files/tasks would a full table scan scan?
import com.netflix.iceberg.TableScan
import scala.collection.JavaConverters._
val scan = iceTable.newScan()
var f = 0
scan.planFiles.asScala.foreach( p => f = f + 1 )
var t = 0
scan.planTasks.asScala.foreach( p => t = t + 1 )
println("files to scan: " + f)
println("tasks: " + t)
files to scan: 29520
tasks: 2826
How many files would a table scan with a partition predicate scan?
import com.netflix.iceberg.TableScan
import scala.collection.JavaConverters._
import com.netflix.iceberg.expressions._
val exp1 = Expressions.equal("c_date", "2017-11-15")
val scan = iceTable.newScan().filter(exp1).filter(exp2)
var f = 0
scan.planFiles.asScala.foreach( p => f = f + 1 )
var t = 0
scan.planTasks.asScala.foreach( p => t = t + 1 )
println("files to scan: " + f)
println("tasks: " + t)
files to scan: 164
tasks: 15
So iceberg-core and iceberg-api are doing the right thing, correctly applying
my predicate and thus pruning most of the work.
But what is the Spark Reader behavior?
val datePartitionedIcebergDf = spark.read.format("iceberg").load("...")
datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
spark.sql("""
SELECT …
WHERE …
AND c_date = '2017-11-15'
GROUP BY …
LIMIT 10
"""
).show
(1) Spark Jobs
Job 0 View(Stages: 3/3)
Stage 0:
2826/2826
succeeded / total tasks
Stage 1:
200/200
succeeded / total tasks
Stage 2:
1/1
succeeded / total tasks
Inspecting the physical plan, we see that the predicate does not get pushed
into iceberg, and that it is phrased in a surprising way by Spark SQL:
== Physical Plan ==
CollectLimit 21
+- *(3) Project [cast(minute1#227 as string) AS minute1#235, cast(pageviews#228
as string) AS pageviews#236]
+- *(3) GlobalLimit 10
+- Exchange SinglePartition
+- *(2) LocalLimit 10
+- *(2) HashAggregate(keys=[minute(timestamp#88,
Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS
sum(value#239)#230], output=[minute1#227, pageviews#228])
+- Exchange hashpartitioning(minute(timestamp#88,
Some(Etc/UTC))#243, 200)
+- *(1) HashAggregate(keys=[minute(timestamp#88,
Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243],
functions=[partial_sum(value#239) AS sum#242], output=[minute(timestamp#88,
Some(Etc/UTC))#243, sum#242])
+- *(1) Project [timestamp#88,
web#97.webPageDetails.pageViews.value AS value#239]
+- *(1) Filter ((cast(c_date#107 as string) =
2017-11-15) && isnotnull(c_date#107))
+- *(1) ScanV2 iceberg[timestamp#88, web#97,
c_date#107] (Filters: [isnotnull(c_date#107)], Options:
[path=adl://…/iceberg/,paths=[]])
If we force the literal to be a DATE, then we get the right partition pruning
behavior:
datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
spark.sql("""
SELECT …
FROM datePartitionedIcebergDf
WHERE c_date = CAST('2017-11-15' AS DATE)
GROUP BY minute1
LIMIT 10
"""
).show
(1) Spark Jobs
Job 5 View(Stages: 3/3)
Stage 15:
15/15
succeeded / total tasks
Stage 16:
200/200
succeeded / total tasks
Stage 17:
1/1
succeeded / total tasks
And inspecting the physical plan we confirm correct behavior:
== Physical Plan ==
CollectLimit 21
+- *(3) Project [cast(minute1#250 as string) AS minute1#258, cast(pageviews#251
as string) AS pageviews#259]
+- *(3) GlobalLimit 10
+- Exchange SinglePartition
+- *(2) LocalLimit 10
+- *(2) HashAggregate(keys=[minute(timestamp#88,
Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS
sum(value#262)#253], output=[minute1#250, pageviews#251])
+- Exchange hashpartitioning(minute(timestamp#88,
Some(Etc/UTC))#266, 200)
+- *(1) HashAggregate(keys=[minute(timestamp#88,
Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266],
functions=[partial_sum(value#262) AS sum#265], output=[minute(timestamp#88,
Some(Etc/UTC))#266, sum#265])
+- *(1) Project [timestamp#88,
web#97.webPageDetails.pageViews.value AS value#262]
+- *(1) Filter ((c_date#107 = 17485) &&
isnotnull(c_date#107))
+- *(1) ScanV2 iceberg[timestamp#88, web#97,
c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)], Options:
[path=adl://…/iceberg/,paths=[]])
Discussion:
The original Spark filter is of the form:
CAST( date AS STRING) OP STRING_LITERAL
Which Iceberg, today, doesn’t understand since it does not support CAST
expressions. Thus a full table scan is done for my original query. Problem is,
it’s very common for folks to use literals to express DATEs. At first, I
thought, why on earth would Spark phrase the predicate like that, since its way
cheaper to evaluate this:
c_date = CAST(‘2017-11-15’ AS DATE)
But it turns out that that phrasing would break pre-establish semantics:
https://github.com/apache/spark/pull/17174#discussion_r104401305
So we have two options I think:
1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL can try to
rephrase and resolve it on SparkExpressions. As per discussion in Spark thread
above, I think it’s a safe conversion if we are strict on matching the
predicate to be a comparison between a DATE and a literal.
or
2) Iceberg should start supporting CAST predicates.
(1) is way cheaper and less invasive, but I wonder if we will see other
instances of similar behavior, or if the incoming predicate is just a bit more
complicated and we would miss matching it with approach of (1), and thus should
just bite the bullet and go for (2).
Comments/Suggestions ?
Xabriel J Collazo Mojica | Senior Software Engineer | Adobe |
[email protected]