[jira] [Commented] (SPARK-39753) Broadcast joins should pushdown join constraints as Filter to the larger relation

2022-08-08 Thread Nick Dimiduk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576643#comment-17576643
 ] 

Nick Dimiduk commented on SPARK-39753:
--

Linking to the original issue.

> Broadcast joins should pushdown join constraints as Filter to the larger 
> relation
> -
>
> Key: SPARK-39753
> URL: https://issues.apache.org/jira/browse/SPARK-39753
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.2.1, 3.3.0
>Reporter: Victor Delépine
>Priority: Major
>
> SPARK-19609 was bulk-closed a while ago, but not fixed. I've decided to 
> re-open it here for more visibility, since I believe this bug has a major 
> impact and that fixing it could drastically improve the performance of many 
> pipelines.
> Allow me to paste the initial description again here:
> _For broadcast inner-joins, where the smaller relation is known to be small 
> enough to materialize on a worker, the set of values for all join columns is 
> known and fits in memory. Spark should translate these values into a 
> {{Filter}} pushed down to the datasource. The common join condition of 
> equality, i.e. {{{}lhs.a == rhs.a{}}}, can be written as an {{a in ...}} 
> clause. An example of pushing such filters is already present in the form of 
> {{IsNotNull}} filters via_ [~sameerag]{_}'s work on SPARK-12957 subtasks.{_}
> _This optimization could even work when the smaller relation does not fit 
> entirely in memory. This could be done by partitioning the smaller relation 
> into N pieces, applying this predicate pushdown for each piece, and unioning 
> the results._
>  
> Essentially, when doing a Broadcast join, the smaller side can be used to 
> filter down the bigger side before performing the join. As of today, the join 
> will read all partitions of the bigger side, without pruning partitions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation

2019-10-08 Thread Nick Dimiduk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947001#comment-16947001
 ] 

Nick Dimiduk commented on SPARK-19609:
--

Hi [~hyukjin.kwon], mind adding a comment as to why this issue was closed? Has 
the functionality been implemented elsewhere? How about a link off to the 
relevant JIRA so I know what fix version to look for? Thanks!

> Broadcast joins should pushdown join constraints as Filter to the larger 
> relation
> -
>
> Key: SPARK-19609
> URL: https://issues.apache.org/jira/browse/SPARK-19609
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>Priority: Major
>  Labels: bulk-closed
>
> For broadcast inner-joins, where the smaller relation is known to be small 
> enough to materialize on a worker, the set of values for all join columns is 
> known and fits in memory. Spark should translate these values into a 
> {{Filter}} pushed down to the datasource. The common join condition of 
> equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. 
> An example of pushing such filters is already present in the form of 
> {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks.
> This optimization could even work when the smaller relation does not fit 
> entirely in memory. This could be done by partitioning the smaller relation 
> into N pieces, applying this predicate pushdown for each piece, and unioning 
> the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27599) DataFrameWriter.partitionBy should be optional when writing to a hive table

2019-05-21 Thread Nick Dimiduk (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845378#comment-16845378
 ] 

Nick Dimiduk commented on SPARK-27599:
--

Sure [~Alexander_Fedosov]. Hive DDL let's you specify a partitioning strategy 
for the physical data layout 
(https://cwiki.apache.org/confluence/display/hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables).
 This is identical to the physical data layout partitioning 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html#partitionBy-java.lang.String...-).
 I observe that when writing to a hive table that has partitioned specified 
from a DataFrameWriter, Spark will throw an error when the hive table metadata 
partition definition does not agree with the partitionBy specified on the 
DataFrameWriter.

My request here is that instead of only erring on disagreement, Spark should 
use the partitioning information from table metadata when no partitionBy call 
has been made. Basically, Spark knows what the destination table needs, so 
don't require that the caller provide it. Furthermore, there are likely other 
aspects of overlapping DDL between the hive table's metadata and methods on 
DataFrameWriter (bucketing comes to mind). When working with a table defined in 
hive metastore, Spark should defer to that metadata rather than require the 
user repeat it all in code.

> DataFrameWriter.partitionBy should be optional when writing to a hive table
> ---
>
> Key: SPARK-27599
> URL: https://issues.apache.org/jira/browse/SPARK-27599
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Minor
>
> When writing to an existing, partitioned table stored in the Hive metastore, 
> Spark requires the call to {{saveAsTable}} to provide a value for 
> {{partitionedBy}}, even though that information is provided by the metastore 
> itself. Indeed, that information is available to Spark, as it will error if 
> the specified {{partitionBy}} does not match that of the table definition in 
> metastore.
> There may be other attributes of the save call that can be retrieved from the 
> metastore...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable

2019-04-30 Thread Nick Dimiduk (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830564#comment-16830564
 ] 

Nick Dimiduk commented on SPARK-27597:
--

{quote}bq. Do you want to access {{SparkSession}} in UDF?
{quote}
Nope. In my case, all I want is the configuration.

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27599) DataFrameWriter$partitionBy should be optional when writing to a hive table

2019-04-29 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-27599:


 Summary: DataFrameWriter$partitionBy should be optional when 
writing to a hive table
 Key: SPARK-27599
 URL: https://issues.apache.org/jira/browse/SPARK-27599
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.1
Reporter: Nick Dimiduk


When writing to an existing, partitioned table stored in the Hive metastore, 
Spark requires the call to {{saveAsTable}} to provide a value for 
{{partitionedBy}}, even though that information is provided by the metastore 
itself. Indeed, that information is available to Spark, as it will error if the 
specified {{partitionBy}} does not match that of the table definition in 
metastore.

There may be other attributes of the save call that can be retrieved from the 
metastore...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27599) DataFrameWriter.partitionBy should be optional when writing to a hive table

2019-04-29 Thread Nick Dimiduk (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Dimiduk updated SPARK-27599:
-
Summary: DataFrameWriter.partitionBy should be optional when writing to a 
hive table  (was: DataFrameWriter$partitionBy should be optional when writing 
to a hive table)

> DataFrameWriter.partitionBy should be optional when writing to a hive table
> ---
>
> Key: SPARK-27599
> URL: https://issues.apache.org/jira/browse/SPARK-27599
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Minor
>
> When writing to an existing, partitioned table stored in the Hive metastore, 
> Spark requires the call to {{saveAsTable}} to provide a value for 
> {{partitionedBy}}, even though that information is provided by the metastore 
> itself. Indeed, that information is available to Spark, as it will error if 
> the specified {{partitionBy}} does not match that of the table definition in 
> metastore.
> There may be other attributes of the save call that can be retrieved from the 
> metastore...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable

2019-04-29 Thread Nick Dimiduk (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16829612#comment-16829612
 ] 

Nick Dimiduk commented on SPARK-27597:
--

It would nice nice if there was an API built into the {{FunctionalInterface}} 
that let their implementation access the fully populated {{SparkSession}}.

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27597) RuntimeConfig should be serializable

2019-04-29 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-27597:


 Summary: RuntimeConfig should be serializable
 Key: SPARK-27597
 URL: https://issues.apache.org/jira/browse/SPARK-27597
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.1
Reporter: Nick Dimiduk


When implementing a UDF or similar, it's quite surprising to see that the 
{{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When modeling 
UDFs in an object-oriented way, this leads to quite a surprise, an ugly NPE 
from the {{call}} site.
{noformat}
Caused by: java.lang.NullPointerException
at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11373) Add metrics to the History Server and providers

2017-11-14 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252776#comment-16252776
 ] 

Nick Dimiduk commented on SPARK-11373:
--

I'm chasing a goose through the wild and have found my way here. It seems Spark 
has two independent subsystems for recording runtime information: 
history/SparkListener and Metrics. I'm startled to find a whole wealth of 
information exposed during job runtime over http/json via 
{{api/v1/applications}}, yet none of this is available to the Metrics systems 
configured with with metrics.properties file. Lovely details like number of 
input, output, and shuffle records per task are unavailable to my Grafana 
dashboards fed by the Ganglia reporter.

Is it an objective of this ticket to report such information through Metrics? 
Is there a separate ticket tracking such an effort? Is it a "simple" matter of 
implementing a {{SparkListener}} that bridges to Metrics?

> Add metrics to the History Server and providers
> ---
>
> Key: SPARK-11373
> URL: https://issues.apache.org/jira/browse/SPARK-11373
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Steve Loughran
>
> The History server doesn't publish metrics about JVM load or anything from 
> the history provider plugins. This means that performance problems from 
> massive job histories aren't visible to management tools, and nor are any 
> provider-generated metrics such as time to load histories, failed history 
> loads, the number of connectivity failures talking to remote services, etc.
> If the history server set up a metrics registry and offered the option to 
> publish its metrics, then management tools could view this data.
> # the metrics registry would need to be passed down to the instantiated 
> {{ApplicationHistoryProvider}}, in order for it to register its metrics.
> # if the codahale metrics servlet were registered under a path such as 
> {{/metrics}}, the values would be visible as HTML and JSON, without the need 
> for management tools.
> # Integration tests could also retrieve the JSON-formatted data and use it as 
> part of the test suites.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17593) list files on s3 very slow

2017-11-08 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244887#comment-16244887
 ] 

Nick Dimiduk commented on SPARK-17593:
--

So the fix in Hadoop 2.8 is for any variant of the s3* FileSystem? Or is it 
only for s3a?

bq. as it really needs Spark to move to listFiles(recursive)

Do we still need this change to be shipped in Spark? Thanks.

> list files on s3 very slow
> --
>
> Key: SPARK-17593
> URL: https://issues.apache.org/jira/browse/SPARK-17593
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
> Environment: spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3)
>Reporter: Gaurav Shah
>Priority: Minor
>
> lets say we have following partitioned data:
> {code}
> events_v3
> -- event_date=2015-01-01
>  event_hour=0
> -- verb=follow
> part1.parquet.gz 
>  event_hour=1
> -- verb=click
> part1.parquet.gz 
> -- event_date=2015-01-02
>  event_hour=5
> -- verb=follow
> part1.parquet.gz 
>  event_hour=10
> -- verb=click
> part1.parquet.gz 
> {code}
> To read (or write ) parquet partitioned data via spark it makes call to 
> `ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all 
> files and folders.
> In this case if we had 300 dates, we would have created 300 jobs each trying 
> to get filelist from date_directory. This process takes about 10 minutes to 
> finish ( with 2 executors). vs if I use a ruby script to get list of all 
> files recursively in the same folder it takes about 1 minute, on the same 
> machine with just 1 thread. 
> I am confused as to why it would take so much time extra for listing files.
> spark code:
> {code:scala}
> val sparkSession = org.apache.spark.sql.SparkSession.builder
> .config("spark.sql.hive.metastorePartitionPruning",true)
> .config("spark.sql.parquet.filterPushdown", true)
> .config("spark.sql.hive.verifyPartitionPath", false)
> .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
> .config("parquet.enable.summary-metadata",false)
> .config("spark.sql.sources.partitionDiscovery.enabled",false)
> .getOrCreate()
> val df = 
> sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
> df.createOrReplaceTempView("temp_events")
> sparkSession.sql(
>   """
> |select verb,count(*) from temp_events where event_date = 
> "2016-08-05" group by verb
>   """.stripMargin).show()
> {code}
> ruby code:
> {code:ruby}
> gem 'aws-sdk', '~> 2'
> require 'aws-sdk'
> client = Aws::S3::Client.new(:region=>'us-west-1')
> next_continuation_token = nil
> total = 0
> loop do
> a= client.list_objects_v2({
>   bucket: "bucket", # required
>   max_keys: 1000,
>   prefix: "events_v3/",
>   continuation_token: next_continuation_token ,
>   fetch_owner: false,
> })
> puts a.contents.last.key
> total += a.contents.size
> next_continuation_token = a.next_continuation_token
> break unless a.is_truncated
> end
> puts "total"
> puts total
> {code}
> tried looking into following bug:
> https://issues.apache.org/jira/browse/HADOOP-12810
> but hadoop 2.7.3 doesn't solve that problem
> stackoverflow reference:
> http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11058) failed spark job reports on YARN as successful

2017-10-25 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219280#comment-16219280
 ] 

Nick Dimiduk commented on SPARK-11058:
--

This is a Spark issue though? Spark could fix the Spark API issue, right? Could 
it not be fixed in a new major/minor release?

> failed spark job reports on YARN as successful
> --
>
> Key: SPARK-11058
> URL: https://issues.apache.org/jira/browse/SPARK-11058
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.0
> Environment: CDH 5.4
>Reporter: Lan Jiang
>Priority: Minor
>
> I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in 
> “yarn-client” mode. The job itself failed due to YARN kills several executor 
> containers because the containers exceeded the memory limit posed by YARN. 
> However, when I went to the YARN resource manager site, it displayed the job 
> as successful. I found there was an issue reported in JIRA 
> https://issues.apache.org/jira/browse/SPARK-3627, but it says it was fixed in 
> Spark 1.2. On Spark history server, it shows the job as “Incomplete”. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11058) failed spark job reports on YARN as successful

2017-10-25 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218994#comment-16218994
 ] 

Nick Dimiduk commented on SPARK-11058:
--

[~srowen], [~vanzin] this is still considered a not-fix issue? I see the same 
behavior with Spark 2.1.0 on Yarn 2.7.3. Job submitted to the cluster in 
client-mode, killed with SIGTERM results in an application in Yarn's job 
history with "YarnApplicationState: FINISHED" and "FinalStatus Reported by AM: 
SUCCEEDED". Surely this is a bug.

> failed spark job reports on YARN as successful
> --
>
> Key: SPARK-11058
> URL: https://issues.apache.org/jira/browse/SPARK-11058
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.0
> Environment: CDH 5.4
>Reporter: Lan Jiang
>Priority: Minor
>
> I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in 
> “yarn-client” mode. The job itself failed due to YARN kills several executor 
> containers because the containers exceeded the memory limit posed by YARN. 
> However, when I went to the YARN resource manager site, it displayed the job 
> as successful. I found there was an issue reported in JIRA 
> https://issues.apache.org/jira/browse/SPARK-3627, but it says it was fixed in 
> Spark 1.2. On Spark history server, it shows the job as “Incomplete”. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17636) Parquet filter push down doesn't handle struct fields

2017-02-20 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875188#comment-15875188
 ] 

Nick Dimiduk commented on SPARK-17636:
--

I've proposed a PR for fixing this issue. Using [~maropu]'s suggested testing 
strategy via explain plans with parquet, with the proposed patch applied, I see 
the following

Before:

{noformat}
scala> df.where($"b._1" === "").explain
== Physical Plan ==
*Filter (b#1._1 = )
+- *FileScan parquet [a#0,b#1] Batched: false, Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/ndimiduk/tmp/spark_19638], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct>
{noformat}

After:

{noformat}
scala> df.where($"b._1" === "").explain
== Physical Plan ==
*Filter (b._1 = )
+- *FileScan parquet [a#283,b#284] Batched: false, Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/ndimiduk/tmp/spark_19638], PartitionFilters: [], 
PushedFilters: [EqualTo(b._1,)], ReadSchema: 
struct>
{noformat}

> Parquet filter push down doesn't handle struct fields
> -
>
> Key: SPARK-17636
> URL: https://issues.apache.org/jira/browse/SPARK-17636
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.2, 1.6.3, 2.0.2
>Reporter: Mitesh
>Priority: Minor
>
> There's a *PushedFilters* for a simple numeric field, but not for a numeric 
> field inside a struct. Not sure if this is a Spark limitation because of 
> Parquet, or only a Spark limitation.
> {noformat}
> scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", 
> "sale_id")
> res5: org.apache.spark.sql.DataFrame = [day_timestamp: 
> struct, sale_id: bigint]
> scala> res5.filter("sale_id > 4").queryExecution.executedPlan
> res9: org.apache.spark.sql.execution.SparkPlan =
> Filter[23814] [args=(sale_id#86324L > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)]
> scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan
> res10: org.apache.spark.sql.execution.SparkPlan =
> Filter[23815] [args=(day_timestamp#86302.timestamp > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields

2017-02-20 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875184#comment-15875184
 ] 

Nick Dimiduk commented on SPARK-19638:
--

Okay [~maropu], I'll continue discussion over there.

> Filter pushdown not working for struct fields
> -
>
> Key: SPARK-19638
> URL: https://issues.apache.org/jira/browse/SPARK-19638
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> Working with a dataset containing struct fields, and enabling debug logging 
> in the ES connector, I'm seeing the following behavior. The dataframe is 
> created over the ES connector and then the schema is extended with a couple 
> column aliases, such as.
> {noformat}
> df.withColumn("f2", df("foo"))
> {noformat}
> Queries vs those alias columns work as expected for fields that are 
> non-struct members.
> {noformat}
> scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show
> 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters 
> [IsNotNull(foo),EqualTo(foo,1)]
> 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL 
> [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}]
> {noformat}
> However, try the same with an alias over a struct field, and no filters are 
> pushed down.
> {noformat}
> scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == 
> '1'").limit(1).show
> {noformat}
> In fact, this is the case even when no alias is used at all.
> {noformat}
> scala> df.where("bar.baz == '1'").limit(1).show
> {noformat}
> Basically, pushdown for structs doesn't work at all.
> Maybe this is specific to the ES connector?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17636) Parquet filter push down doesn't handle struct fields

2017-02-20 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875178#comment-15875178
 ] 

Nick Dimiduk commented on SPARK-17636:
--

Per [~maropu]'s comments on SPARK-19638, this issue is not limited to parquet. 
In my case, I'm seeing the same lack of struct filter pushdown while working 
with the ElasticSearch connector.

> Parquet filter push down doesn't handle struct fields
> -
>
> Key: SPARK-17636
> URL: https://issues.apache.org/jira/browse/SPARK-17636
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.2, 1.6.3, 2.0.2
>Reporter: Mitesh
>Priority: Minor
>
> There's a *PushedFilters* for a simple numeric field, but not for a numeric 
> field inside a struct. Not sure if this is a Spark limitation because of 
> Parquet, or only a Spark limitation.
> {noformat}
> scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", 
> "sale_id")
> res5: org.apache.spark.sql.DataFrame = [day_timestamp: 
> struct, sale_id: bigint]
> scala> res5.filter("sale_id > 4").queryExecution.executedPlan
> res9: org.apache.spark.sql.execution.SparkPlan =
> Filter[23814] [args=(sale_id#86324L > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)]
> scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan
> res10: org.apache.spark.sql.execution.SparkPlan =
> Filter[23815] [args=(day_timestamp#86302.timestamp > 
> 4)][outPart=UnknownPartitioning(0)][outOrder=List()]
> +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: 
> s3a://some/parquet/file
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19615) Provide Dataset union convenience for divergent schema

2017-02-20 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875137#comment-15875137
 ] 

Nick Dimiduk commented on SPARK-19615:
--

Thanks for taking a look [~hyukjin.kwon]. These three bugs are indeed issues -- 
in all cases, it seems spark was not being careful to map column names to the 
appropriate column from each site of the union. My experience with 1.6.3 and 
2.1.0 with unions has been much better. Actually, I still see echos of 
SPARK-9874 / SPARK-9813 when I extend one side or the other with null columns. 
I can file that as a separate issue if that's of interest to you.

As for what RDBMS may or may not do, I'm not very aware or concerned. I'm 
thinking more about ease of use for a user. This is why I suggest perhaps a 
different union method that would encapsulate this behavior. Parsed spark sql 
can exhibit whatever semantics the community deems appropriate, while still 
giving users of the API access to this convenient functionality. I've 
implemented this logic in my application and it's quite complex. It would be 
very good for Spark to provide this for its users.

> Provide Dataset union convenience for divergent schema
> --
>
> Key: SPARK-19615
> URL: https://issues.apache.org/jira/browse/SPARK-19615
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> Creating a union DataFrame over two sources that have different schema 
> definitions is surprisingly complex. Provide a version of the union method 
> that will create a infer a target schema as the result of merging the 
> sources. Automatically add extend either side with {{null}} columns for any 
> missing columns that are nullable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields

2017-02-17 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872413#comment-15872413
 ] 

Nick Dimiduk commented on SPARK-19638:
--

Debugging. I'm looking at the match expression in 
[{{DataSourceStrategy#translateFilter(Expression)}}|https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L509].
 The predicate comes in as a {{EqualTo(GetStructField, Literal)}}. This doesn't 
match any of the cases. I was expecting it to step into the [{{case 
expressions.EqualTo(a: Attribute, Literal(v, t)) 
=>}}|https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L511]
 on line 511 but execution steps past this point. Upon investigation, 
{{GetStructField}} does not extend {{Attribute}}.

>From this point, the {{EqualTo}} condition involving the struct field is 
>dropped from the filter set pushed down to the ES connector. Thus I believe 
>this is an issue in Spark, not in the connector.

> Filter pushdown not working for struct fields
> -
>
> Key: SPARK-19638
> URL: https://issues.apache.org/jira/browse/SPARK-19638
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> Working with a dataset containing struct fields, and enabling debug logging 
> in the ES connector, I'm seeing the following behavior. The dataframe is 
> created over the ES connector and then the schema is extended with a couple 
> column aliases, such as.
> {noformat}
> df.withColumn("f2", df("foo"))
> {noformat}
> Queries vs those alias columns work as expected for fields that are 
> non-struct members.
> {noformat}
> scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show
> 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters 
> [IsNotNull(foo),EqualTo(foo,1)]
> 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL 
> [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}]
> {noformat}
> However, try the same with an alias over a struct field, and no filters are 
> pushed down.
> {noformat}
> scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == 
> '1'").limit(1).show
> {noformat}
> In fact, this is the case even when no alias is used at all.
> {noformat}
> scala> df.where("bar.baz == '1'").limit(1).show
> {noformat}
> Basically, pushdown for structs doesn't work at all.
> Maybe this is specific to the ES connector?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields

2017-02-17 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872060#comment-15872060
 ] 

Nick Dimiduk commented on SPARK-19638:
--

[~maropu] I have placed a breakpoint in the ES connector's implementation of 
{{PrunedFilterScan#buildScan(Array[String], Array[Filter])}}. Here I see no 
filters for the struct columns. Indeed this is precisely where the log messages 
are produced. For this reason, I believe this to be an issue with Catalyst, not 
the connector. Perhaps you can guide me through further debugging? Thanks.

> Filter pushdown not working for struct fields
> -
>
> Key: SPARK-19638
> URL: https://issues.apache.org/jira/browse/SPARK-19638
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> Working with a dataset containing struct fields, and enabling debug logging 
> in the ES connector, I'm seeing the following behavior. The dataframe is 
> created over the ES connector and then the schema is extended with a couple 
> column aliases, such as.
> {noformat}
> df.withColumn("f2", df("foo"))
> {noformat}
> Queries vs those alias columns work as expected for fields that are 
> non-struct members.
> {noformat}
> scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show
> 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters 
> [IsNotNull(foo),EqualTo(foo,1)]
> 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL 
> [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}]
> {noformat}
> However, try the same with an alias over a struct field, and no filters are 
> pushed down.
> {noformat}
> scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == 
> '1'").limit(1).show
> {noformat}
> In fact, this is the case even when no alias is used at all.
> {noformat}
> scala> df.where("bar.baz == '1'").limit(1).show
> {noformat}
> Basically, pushdown for structs doesn't work at all.
> Maybe this is specific to the ES connector?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19638) Filter pushdown not working for struct fields

2017-02-16 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-19638:


 Summary: Filter pushdown not working for struct fields
 Key: SPARK-19638
 URL: https://issues.apache.org/jira/browse/SPARK-19638
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: Nick Dimiduk


Working with a dataset containing struct fields, and enabling debug logging in 
the ES connector, I'm seeing the following behavior. The dataframe is created 
over the ES connector and then the schema is extended with a couple column 
aliases, such as.

{noformat}
df.withColumn("f2", df("foo"))
{noformat}

Queries vs those alias columns work as expected for fields that are non-struct 
members.

{noformat}
scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show
17/02/16 15:06:49 DEBUG DataSource: Pushing down filters 
[IsNotNull(foo),EqualTo(foo,1)]
17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL 
[{"exists":{"field":"foo"}},{"match":{"foo":"1"}}]
{noformat}

However, try the same with an alias over a struct field, and no filters are 
pushed down.

{noformat}
scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == 
'1'").limit(1).show
{noformat}

In fact, this is the case even when no alias is used at all.

{noformat}
scala> df.where("bar.baz == '1'").limit(1).show
{noformat}

Basically, pushdown for structs doesn't work at all.

Maybe this is specific to the ES connector?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19614) add type-preserving null function

2017-02-16 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870525#comment-15870525
 ] 

Nick Dimiduk commented on SPARK-19614:
--

{{lit(null).cast(type)}} does exactly what I needed. Thanks fellas.

> add type-preserving null function
> -
>
> Key: SPARK-19614
> URL: https://issues.apache.org/jira/browse/SPARK-19614
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>Priority: Trivial
>
> There's currently no easy way to extend the columns of a DataFrame with null 
> columns that also preserves the type. {{lit(null)}} evaluates to 
> {{Literal(null, NullType)}}, despite any subsequent hinting, for instance 
> with {{Column.as(String, Metadata)}}. This comes up when programmatically 
> munging data from disparate sources. A function such as {{null(DataType)}} 
> would be nice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-19614) add type-preserving null function

2017-02-16 Thread Nick Dimiduk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Dimiduk closed SPARK-19614.

Resolution: Invalid

> add type-preserving null function
> -
>
> Key: SPARK-19614
> URL: https://issues.apache.org/jira/browse/SPARK-19614
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>Priority: Trivial
>
> There's currently no easy way to extend the columns of a DataFrame with null 
> columns that also preserves the type. {{lit(null)}} evaluates to 
> {{Literal(null, NullType)}}, despite any subsequent hinting, for instance 
> with {{Column.as(String, Metadata)}}. This comes up when programmatically 
> munging data from disparate sources. A function such as {{null(DataType)}} 
> would be nice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19615) Provide Dataset union convenience for divergent schema

2017-02-16 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870513#comment-15870513
 ] 

Nick Dimiduk commented on SPARK-19615:
--

IMHO, a union operation should be as generous as possible. This facilitates 
common ETL and data cleansing operations where the sources are sparse-schema 
structures (JSON, HBase, Elastic Search, &c). A couple examples of what I mean.

Given dataframes of type
{noformat}
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)
{noformat}
and
{noformat}
root
 |-- a: string (nullable = false)
 |-- c: string (nullable = true)
{noformat}
I would expect the union operation to infer the nullable columns from both 
sides to produce a dataframe of type
{noformat}
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
{noformat}

This should work on an arbitrarily deep nesting of structs, so

{noformat}
root
 |-- a: string (nullable = false)
 |-- b: struct (nullable = false)
 ||-- b1: string (nullable = true)
 ||-- b2: string (nullable = true)
{noformat}
unioned with
{noformat}
root
 |-- a: string (nullable = false)
 |-- b: struct (nullable = false)
 ||-- b3: string (nullable = true)
 ||-- b4: string (nullable = true)
{noformat}
would result in
{noformat}
root
 |-- a: string (nullable = false)
 |-- b: struct (nullable = false)
 ||-- b1: string (nullable = true)
 ||-- b2: string (nullable = true)
 ||-- b3: string (nullable = true)
 ||-- b4: string (nullable = true)
{noformat}

> Provide Dataset union convenience for divergent schema
> --
>
> Key: SPARK-19615
> URL: https://issues.apache.org/jira/browse/SPARK-19615
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>Priority: Minor
>
> Creating a union DataFrame over two sources that have different schema 
> definitions is surprisingly complex. Provide a version of the union method 
> that will create a infer a target schema as the result of merging the 
> sources. Automatically add extend either side with {{null}} columns for any 
> missing columns that are nullable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19615) Provide Dataset union convenience for divergent schema

2017-02-16 Thread Nick Dimiduk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Dimiduk updated SPARK-19615:
-
Priority: Major  (was: Minor)

> Provide Dataset union convenience for divergent schema
> --
>
> Key: SPARK-19615
> URL: https://issues.apache.org/jira/browse/SPARK-19615
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> Creating a union DataFrame over two sources that have different schema 
> definitions is surprisingly complex. Provide a version of the union method 
> that will create a infer a target schema as the result of merging the 
> sources. Automatically add extend either side with {{null}} columns for any 
> missing columns that are nullable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19615) Provide Dataset union convenience for divergent schema

2017-02-15 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-19615:


 Summary: Provide Dataset union convenience for divergent schema
 Key: SPARK-19615
 URL: https://issues.apache.org/jira/browse/SPARK-19615
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Nick Dimiduk
Priority: Minor


Creating a union DataFrame over two sources that have different schema 
definitions is surprisingly complex. Provide a version of the union method that 
will create a infer a target schema as the result of merging the sources. 
Automatically add extend either side with {{null}} columns for any missing 
columns that are nullable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19614) add type-preserving null function

2017-02-15 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-19614:


 Summary: add type-preserving null function
 Key: SPARK-19614
 URL: https://issues.apache.org/jira/browse/SPARK-19614
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Nick Dimiduk
Priority: Trivial


There's currently no easy way to extend the columns of a DataFrame with null 
columns that also preserves the type. {{lit(null)}} evaluates to 
{{Literal(null, NullType)}}, despite any subsequent hinting, for instance with 
{{Column.as(String, Metadata)}}. This comes up when programmatically munging 
data from disparate sources. A function such as {{null(DataType)}} would be 
nice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan

2017-02-15 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868366#comment-15868366
 ] 

Nick Dimiduk commented on SPARK-12957:
--

Filed SPARK-19609. IMHO, it would be another subtask on this ticket.

> Derive and propagate data constrains in logical plan 
> -
>
> Key: SPARK-12957
> URL: https://issues.apache.org/jira/browse/SPARK-12957
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Sameer Agarwal
> Attachments: ConstraintPropagationinSparkSQL.pdf
>
>
> Based on the semantic of a query plan, we can derive data constrains (e.g. if 
> a filter defines {{a > 10}}, we know that the output data of this filter 
> satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a 
> framework to derive and propagate constrains in the logical plan, which can 
> help us to build more advanced optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation

2017-02-15 Thread Nick Dimiduk (JIRA)
Nick Dimiduk created SPARK-19609:


 Summary: Broadcast joins should pushdown join constraints as 
Filter to the larger relation
 Key: SPARK-19609
 URL: https://issues.apache.org/jira/browse/SPARK-19609
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.1.0
Reporter: Nick Dimiduk


For broadcast inner-joins, where the smaller relation is known to be small 
enough to materialize on a worker, the set of values for all join columns is 
known and fits in memory. Spark should translate these values into a {{Filter}} 
pushed down to the datasource. The common join condition of equality, i.e. 
{{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. An example of 
pushing such filters is already present in the form of {{IsNotNull}} filters 
via [~sameerag]'s work on SPARK-12957 subtasks.

This optimization could even work when the smaller relation does not fit 
entirely in memory. This could be done by partitioning the smaller relation 
into N pieces, applying this predicate pushdown for each piece, and unioning 
the results.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join

2017-02-14 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866262#comment-15866262
 ] 

Nick Dimiduk commented on SPARK-13219:
--

I would implement this manually by materializing the smaller relation in the 
driver and then transforming those values into a filter applied to the larger. 
Frankly I expected this to be meaning of a broadcast join. I'm wondering if I'm 
doing something to prevent the planner from performing this optimization, so 
maybe the mailing list is a more appropriate place to discuss?

> Pushdown predicate propagation in SparkSQL with join
> 
>
> Key: SPARK-13219
> URL: https://issues.apache.org/jira/browse/SPARK-13219
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.0
> Environment: Spark 1.4
> Datastax Spark connector 1.4
> Cassandra. 2.1.12
> Centos 6.6
>Reporter: Abhinav Chawade
>
> When 2 or more tables are joined in SparkSQL and there is an equality clause 
> in query on attributes used to perform the join, it is useful to apply that 
> clause on scans for both table. If this is not done, one of the tables 
> results in full scan which can reduce the query dramatically. Consider 
> following example with 2 tables being joined.
> {code}
> CREATE TABLE assets (
> assetid int PRIMARY KEY,
> address text,
> propertyname text
> )
> CREATE TABLE tenants (
> assetid int PRIMARY KEY,
> name text
> )
> spark-sql> explain select t.name from tenants t, assets a where a.assetid = 
> t.assetid and t.assetid='1201';
> WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to 
> load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> == Physical Plan ==
> Project [name#14]
>  ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
>   Exchange (HashPartitioning 200)
>Filter (CAST(assetid#13, DoubleType) = 1201.0)
> HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, 
> Some(t)), None
>   Exchange (HashPartitioning 200)
>HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), 
> None
> Time taken: 1.354 seconds, Fetched 8 row(s)
> {code}
> The simple workaround is to add another equality condition for each table but 
> it becomes cumbersome. It will be helpful if the query planner could improve 
> filter propagation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan

2017-02-14 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866239#comment-15866239
 ] 

Nick Dimiduk commented on SPARK-12957:
--

[~sameerag] thanks for the comment. From a naive scan of the tickets, I believe 
I am seeing the benefits of SPARK-13871 in that a {{IsNotNull}} constraint is 
applied from the names of the join columns. However, I don't see the boon of 
SPARK-13789, specifically the {{a = 5, a = b}} mentioned in the description. My 
query is a join between a very small relation (100's of rows) and a very large 
one (10's of billions). I've hinted the planner to broadcast the smaller table, 
which it honors. After SPARK-13789, I expected to see the join column values 
pushed down as well. This is not the case.

Any tips on debugging this further? I've set breakpoints in the 
{{RelationProvider}} implementation and see that it's only receiving the 
{{IsNotNull}} filters, nothing further from the planner.

Thanks a lot!

> Derive and propagate data constrains in logical plan 
> -
>
> Key: SPARK-12957
> URL: https://issues.apache.org/jira/browse/SPARK-12957
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Sameer Agarwal
> Attachments: ConstraintPropagationinSparkSQL.pdf
>
>
> Based on the semantic of a query plan, we can derive data constrains (e.g. if 
> a filter defines {{a > 10}}, we know that the output data of this filter 
> satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a 
> framework to derive and propagate constrains in the logical plan, which can 
> help us to build more advanced optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join

2017-02-14 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866210#comment-15866210
 ] 

Nick Dimiduk commented on SPARK-13219:
--

bq. The result is that no matter the number of tables and join columns, the 
predicates are augmented such that = and IN on literals are pushed to all the 
joined tables.

[~velvia], [~smilegator], the above statement is an accurate reflection of this 
patch? I've upgraded to 2.1.0 in order to access this optimization. What about 
the case of a broadcast join -- will the values present in the smaller relation 
be pushed down to the scanner of the larger? I'm not seeing that behavior. 
Maybe this optimization is out of scope of this patch? I do see {{IsNotNull}} 
pushed down, applied the the columns involved in the join constraint, but I was 
expecting a but more.

Thanks again for the patch!

> Pushdown predicate propagation in SparkSQL with join
> 
>
> Key: SPARK-13219
> URL: https://issues.apache.org/jira/browse/SPARK-13219
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.0
> Environment: Spark 1.4
> Datastax Spark connector 1.4
> Cassandra. 2.1.12
> Centos 6.6
>Reporter: Abhinav Chawade
>
> When 2 or more tables are joined in SparkSQL and there is an equality clause 
> in query on attributes used to perform the join, it is useful to apply that 
> clause on scans for both table. If this is not done, one of the tables 
> results in full scan which can reduce the query dramatically. Consider 
> following example with 2 tables being joined.
> {code}
> CREATE TABLE assets (
> assetid int PRIMARY KEY,
> address text,
> propertyname text
> )
> CREATE TABLE tenants (
> assetid int PRIMARY KEY,
> name text
> )
> spark-sql> explain select t.name from tenants t, assets a where a.assetid = 
> t.assetid and t.assetid='1201';
> WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to 
> load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> == Physical Plan ==
> Project [name#14]
>  ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
>   Exchange (HashPartitioning 200)
>Filter (CAST(assetid#13, DoubleType) = 1201.0)
> HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, 
> Some(t)), None
>   Exchange (HashPartitioning 200)
>HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), 
> None
> Time taken: 1.354 seconds, Fetched 8 row(s)
> {code}
> The simple workaround is to add another equality condition for each table but 
> it becomes cumbersome. It will be helpful if the query planner could improve 
> filter propagation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join

2017-02-13 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864798#comment-15864798
 ] 

Nick Dimiduk commented on SPARK-13219:
--

Now that all the subtasks on SPARK-12957 are resolved, where does that leave 
this feature? I'm trying to determine if I get this very useful enhancement by 
upgrading to 2.x. Thanks a lot!

> Pushdown predicate propagation in SparkSQL with join
> 
>
> Key: SPARK-13219
> URL: https://issues.apache.org/jira/browse/SPARK-13219
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.0
> Environment: Spark 1.4
> Datastax Spark connector 1.4
> Cassandra. 2.1.12
> Centos 6.6
>Reporter: Abhinav Chawade
>
> When 2 or more tables are joined in SparkSQL and there is an equality clause 
> in query on attributes used to perform the join, it is useful to apply that 
> clause on scans for both table. If this is not done, one of the tables 
> results in full scan which can reduce the query dramatically. Consider 
> following example with 2 tables being joined.
> {code}
> CREATE TABLE assets (
> assetid int PRIMARY KEY,
> address text,
> propertyname text
> )
> CREATE TABLE tenants (
> assetid int PRIMARY KEY,
> name text
> )
> spark-sql> explain select t.name from tenants t, assets a where a.assetid = 
> t.assetid and t.assetid='1201';
> WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to 
> load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> == Physical Plan ==
> Project [name#14]
>  ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
>   Exchange (HashPartitioning 200)
>Filter (CAST(assetid#13, DoubleType) = 1201.0)
> HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, 
> Some(t)), None
>   Exchange (HashPartitioning 200)
>HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), 
> None
> Time taken: 1.354 seconds, Fetched 8 row(s)
> {code}
> The simple workaround is to add another equality condition for each table but 
> it becomes cumbersome. It will be helpful if the query planner could improve 
> filter propagation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan

2017-02-13 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864796#comment-15864796
 ] 

Nick Dimiduk commented on SPARK-12957:
--

I'm trying to understand the current state of SPARK-13219/SPARK-12532, which it 
seems are deferring to this issue. I see all subtasks here have been fixedFor 
in 2.0; is there further work to be done on this ticket? How does the sum of 
this work relate back to the predicate pushdown join optimization described in 
SPARK-13219. Basically, I'm trying to determine if I get this very useful 
enhancement by upgrading to 2.x. Thanks a lot!

> Derive and propagate data constrains in logical plan 
> -
>
> Key: SPARK-12957
> URL: https://issues.apache.org/jira/browse/SPARK-12957
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Sameer Agarwal
> Attachments: ConstraintPropagationinSparkSQL.pdf
>
>
> Based on the semantic of a query plan, we can derive data constrains (e.g. if 
> a filter defines {{a > 10}}, we know that the output data of this filter 
> satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a 
> framework to derive and propagate constrains in the logical plan, which can 
> help us to build more advanced optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org