I think you are generally right, but there are so many different scenarios
that it might not always be the best option, consider for instance a "fast"
network in between a single data source and "Spark", lots of data, an
"expensive" (with low selectivity) expression as Wenchen suggested.

In such a case it looks to me that you end up "re-scanning" the whole
dataset just to make sure the filter has been applied, where having such an
info as metadata or via a communication protocol with the data source (if
supported) would be cheaper.

If there is no support at all for such a mechanism I think it could be
worth exploring a bit more the idea. However, supporting such a mechanism
would require some developing effort for each datasource to support (e.g.,
asking the datasource for the physical plan applied at query time, the
ability to parse it to extract relevant info and act on them), as I am not
aware of any general interface for exchanging such information.



On Sun, 9 Dec 2018 at 15:34, Jörn Franke <jornfra...@gmail.com> wrote:

> It is not about lying or not or trust or not. Some or all filters may not
> be supported by a data source. Some might only be applied under certain
> environmental conditions (eg enough memory etc).
>
> It is much more expensive to communicate between Spark and a data source
> which filters have been applied or not than just checking it as Spark does.
> Especially if you have several different data sources at the same time
> (joins etc).
>
> Am 09.12.2018 um 14:30 schrieb Wenchen Fan <cloud0...@gmail.com>:
>
> expressions/functions can be expensive and I do think Spark should trust
> data source and not re-apply pushed filters. If data source lies, many
> things can go wrong...
>
> On Sun, Dec 9, 2018 at 8:17 PM Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Well even if it has to apply it again, if pushdown is activated then it
>> will be much less cost for spark to see if the filter has been applied or
>> not. Applying the filter is negligible, what it really avoids if the file
>> format implements it is IO cost (for reading) as well as cost for
>> converting from the file format internal datatype to the one of Spark.
>> Those two things are very expensive, but not the filter check. In the end,
>> it could be also data source internal reasons not to apply a filter (there
>> can be many depending on your scenario, the format etc). Instead of
>> “discussing” between Spark and the data source it is much less costly that
>> Spark checks that the filters are consistently applied.
>>
>> Am 09.12.2018 um 12:39 schrieb Alessandro Solimando <
>> alessandro.solima...@gmail.com>:
>>
>> Hello,
>> that's an interesting question, but after Frank's reply I am a bit
>> puzzled.
>>
>> If there is no control over the pushdown status how can Spark guarantee
>> the correctness of the final query?
>>
>> Consider a filter pushed down to the data source, either Spark has to
>> know if it has been applied or not, or it has to re-apply the filter anyway
>> (and pay the price for that).
>>
>> Is there any other option I am not considering?
>>
>> Best regards,
>> Alessandro
>>
>> Il giorno Sab 8 Dic 2018, 12:32 Jörn Franke <jornfra...@gmail.com> ha
>> scritto:
>>
>>> BTW. Even for json a pushdown can make sense to avoid that data is
>>> unnecessary ending in Spark ( because it would cause unnecessary overhead).
>>> In the datasource v2 api you need to implement a SupportsPushDownFilter
>>>
>>> > Am 08.12.2018 um 10:50 schrieb Noritaka Sekiyama <moomind...@gmail.com
>>> >:
>>> >
>>> > Hi,
>>> >
>>> > I'm a support engineer, interested in DataSourceV2.
>>> >
>>> > Recently I had some pain to troubleshoot to check if pushdown is
>>> actually applied or not.
>>> > I noticed that DataFrame's explain() method shows pushdown even for
>>> JSON.
>>> > It totally depends on DataSource side, I believe. However, I would
>>> like Spark to have some way to confirm whether specific pushdown is
>>> actually applied in DataSource or not.
>>> >
>>> > # Example
>>> > val df = spark.read.json("s3://sample_bucket/people.json")
>>> > df.printSchema()
>>> > df.filter($"age" > 20).explain()
>>> >
>>> > root
>>> >  |-- age: long (nullable = true)
>>> >  |-- name: string (nullable = true)
>>> >
>>> > == Physical Plan ==
>>> > *Project [age#47L, name#48]
>>> > +- *Filter (isnotnull(age#47L) && (age#47L > 20))
>>> >    +- *FileScan json [age#47L,name#48] Batched: false, Format: JSON,
>>> Location: InMemoryFileIndex[s3://sample_bucket/people.json],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)],
>>> ReadSchema: struct<age:bigint,name:string>
>>> >
>>> > # Comments
>>> > As you can see, PushedFilter is shown even if input data is JSON.
>>> > Actually this pushdown is not used.
>>> >
>>> > I'm wondering if it has been already discussed or not.
>>> > If not, this is a chance to have such feature in DataSourceV2 because
>>> it would require some API level changes.
>>> >
>>> >
>>> > Warm regards,
>>> >
>>> > Noritaka Sekiyama
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>

Reply via email to