[jira] [Commented] (SPARK-39753) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576643#comment-17576643 ] Nick Dimiduk commented on SPARK-39753: -- Linking to the original issue. > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-39753 > URL: https://issues.apache.org/jira/browse/SPARK-39753 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0, 3.2.1, 3.3.0 >Reporter: Victor Delépine >Priority: Major > > SPARK-19609 was bulk-closed a while ago, but not fixed. I've decided to > re-open it here for more visibility, since I believe this bug has a major > impact and that fixing it could drastically improve the performance of many > pipelines. > Allow me to paste the initial description again here: > _For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{{}lhs.a == rhs.a{}}}, can be written as an {{a in ...}} > clause. An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via_ [~sameerag]{_}'s work on SPARK-12957 subtasks.{_} > _This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results._ > > Essentially, when doing a Broadcast join, the smaller side can be used to > filter down the bigger side before performing the join. As of today, the join > will read all partitions of the bigger side, without pruning partitions -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947001#comment-16947001 ] Nick Dimiduk commented on SPARK-19609: -- Hi [~hyukjin.kwon], mind adding a comment as to why this issue was closed? Has the functionality been implemented elsewhere? How about a link off to the relevant JIRA so I know what fix version to look for? Thanks! > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27599) DataFrameWriter.partitionBy should be optional when writing to a hive table
[ https://issues.apache.org/jira/browse/SPARK-27599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845378#comment-16845378 ] Nick Dimiduk commented on SPARK-27599: -- Sure [~Alexander_Fedosov]. Hive DDL let's you specify a partitioning strategy for the physical data layout (https://cwiki.apache.org/confluence/display/hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables). This is identical to the physical data layout partitioning https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html#partitionBy-java.lang.String...-). I observe that when writing to a hive table that has partitioned specified from a DataFrameWriter, Spark will throw an error when the hive table metadata partition definition does not agree with the partitionBy specified on the DataFrameWriter. My request here is that instead of only erring on disagreement, Spark should use the partitioning information from table metadata when no partitionBy call has been made. Basically, Spark knows what the destination table needs, so don't require that the caller provide it. Furthermore, there are likely other aspects of overlapping DDL between the hive table's metadata and methods on DataFrameWriter (bucketing comes to mind). When working with a table defined in hive metastore, Spark should defer to that metadata rather than require the user repeat it all in code. > DataFrameWriter.partitionBy should be optional when writing to a hive table > --- > > Key: SPARK-27599 > URL: https://issues.apache.org/jira/browse/SPARK-27599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.1 >Reporter: Nick Dimiduk >Priority: Minor > > When writing to an existing, partitioned table stored in the Hive metastore, > Spark requires the call to {{saveAsTable}} to provide a value for > {{partitionedBy}}, even though that information is provided by the metastore > itself. Indeed, that information is available to Spark, as it will error if > the specified {{partitionBy}} does not match that of the table definition in > metastore. > There may be other attributes of the save call that can be retrieved from the > metastore... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable
[ https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830564#comment-16830564 ] Nick Dimiduk commented on SPARK-27597: -- {quote}bq. Do you want to access {{SparkSession}} in UDF? {quote} Nope. In my case, all I want is the configuration. > RuntimeConfig should be serializable > > > Key: SPARK-27597 > URL: https://issues.apache.org/jira/browse/SPARK-27597 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.1 >Reporter: Nick Dimiduk >Priority: Major > > When implementing a UDF or similar, it's quite surprising to see that the > {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When > modeling UDFs in an object-oriented way, this leads to quite a surprise, an > ugly NPE from the {{call}} site. > {noformat} > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143) > at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141) > at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170) > at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170) > ...{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27599) DataFrameWriter$partitionBy should be optional when writing to a hive table
Nick Dimiduk created SPARK-27599: Summary: DataFrameWriter$partitionBy should be optional when writing to a hive table Key: SPARK-27599 URL: https://issues.apache.org/jira/browse/SPARK-27599 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.1 Reporter: Nick Dimiduk When writing to an existing, partitioned table stored in the Hive metastore, Spark requires the call to {{saveAsTable}} to provide a value for {{partitionedBy}}, even though that information is provided by the metastore itself. Indeed, that information is available to Spark, as it will error if the specified {{partitionBy}} does not match that of the table definition in metastore. There may be other attributes of the save call that can be retrieved from the metastore... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27599) DataFrameWriter.partitionBy should be optional when writing to a hive table
[ https://issues.apache.org/jira/browse/SPARK-27599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Dimiduk updated SPARK-27599: - Summary: DataFrameWriter.partitionBy should be optional when writing to a hive table (was: DataFrameWriter$partitionBy should be optional when writing to a hive table) > DataFrameWriter.partitionBy should be optional when writing to a hive table > --- > > Key: SPARK-27599 > URL: https://issues.apache.org/jira/browse/SPARK-27599 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.1 >Reporter: Nick Dimiduk >Priority: Minor > > When writing to an existing, partitioned table stored in the Hive metastore, > Spark requires the call to {{saveAsTable}} to provide a value for > {{partitionedBy}}, even though that information is provided by the metastore > itself. Indeed, that information is available to Spark, as it will error if > the specified {{partitionBy}} does not match that of the table definition in > metastore. > There may be other attributes of the save call that can be retrieved from the > metastore... -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable
[ https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16829612#comment-16829612 ] Nick Dimiduk commented on SPARK-27597: -- It would nice nice if there was an API built into the {{FunctionalInterface}} that let their implementation access the fully populated {{SparkSession}}. > RuntimeConfig should be serializable > > > Key: SPARK-27597 > URL: https://issues.apache.org/jira/browse/SPARK-27597 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.1 >Reporter: Nick Dimiduk >Priority: Major > > When implementing a UDF or similar, it's quite surprising to see that the > {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When > modeling UDFs in an object-oriented way, this leads to quite a surprise, an > ugly NPE from the {{call}} site. > {noformat} > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143) > at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141) > at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170) > at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170) > ...{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27597) RuntimeConfig should be serializable
Nick Dimiduk created SPARK-27597: Summary: RuntimeConfig should be serializable Key: SPARK-27597 URL: https://issues.apache.org/jira/browse/SPARK-27597 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.1 Reporter: Nick Dimiduk When implementing a UDF or similar, it's quite surprising to see that the {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When modeling UDFs in an object-oriented way, this leads to quite a surprise, an ugly NPE from the {{call}} site. {noformat} Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141) at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170) at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170) ...{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11373) Add metrics to the History Server and providers
[ https://issues.apache.org/jira/browse/SPARK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252776#comment-16252776 ] Nick Dimiduk commented on SPARK-11373: -- I'm chasing a goose through the wild and have found my way here. It seems Spark has two independent subsystems for recording runtime information: history/SparkListener and Metrics. I'm startled to find a whole wealth of information exposed during job runtime over http/json via {{api/v1/applications}}, yet none of this is available to the Metrics systems configured with with metrics.properties file. Lovely details like number of input, output, and shuffle records per task are unavailable to my Grafana dashboards fed by the Ganglia reporter. Is it an objective of this ticket to report such information through Metrics? Is there a separate ticket tracking such an effort? Is it a "simple" matter of implementing a {{SparkListener}} that bridges to Metrics? > Add metrics to the History Server and providers > --- > > Key: SPARK-11373 > URL: https://issues.apache.org/jira/browse/SPARK-11373 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 1.6.0 >Reporter: Steve Loughran > > The History server doesn't publish metrics about JVM load or anything from > the history provider plugins. This means that performance problems from > massive job histories aren't visible to management tools, and nor are any > provider-generated metrics such as time to load histories, failed history > loads, the number of connectivity failures talking to remote services, etc. > If the history server set up a metrics registry and offered the option to > publish its metrics, then management tools could view this data. > # the metrics registry would need to be passed down to the instantiated > {{ApplicationHistoryProvider}}, in order for it to register its metrics. > # if the codahale metrics servlet were registered under a path such as > {{/metrics}}, the values would be visible as HTML and JSON, without the need > for management tools. > # Integration tests could also retrieve the JSON-formatted data and use it as > part of the test suites. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17593) list files on s3 very slow
[ https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244887#comment-16244887 ] Nick Dimiduk commented on SPARK-17593: -- So the fix in Hadoop 2.8 is for any variant of the s3* FileSystem? Or is it only for s3a? bq. as it really needs Spark to move to listFiles(recursive) Do we still need this change to be shipped in Spark? Thanks. > list files on s3 very slow > -- > > Key: SPARK-17593 > URL: https://issues.apache.org/jira/browse/SPARK-17593 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0 > Environment: spark 2.0.0, hadoop 2.7.2 ( hadoop 2.7.3) >Reporter: Gaurav Shah >Priority: Minor > > lets say we have following partitioned data: > {code} > events_v3 > -- event_date=2015-01-01 > event_hour=0 > -- verb=follow > part1.parquet.gz > event_hour=1 > -- verb=click > part1.parquet.gz > -- event_date=2015-01-02 > event_hour=5 > -- verb=follow > part1.parquet.gz > event_hour=10 > -- verb=click > part1.parquet.gz > {code} > To read (or write ) parquet partitioned data via spark it makes call to > `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all > files and folders. > In this case if we had 300 dates, we would have created 300 jobs each trying > to get filelist from date_directory. This process takes about 10 minutes to > finish ( with 2 executors). vs if I use a ruby script to get list of all > files recursively in the same folder it takes about 1 minute, on the same > machine with just 1 thread. > I am confused as to why it would take so much time extra for listing files. > spark code: > {code:scala} > val sparkSession = org.apache.spark.sql.SparkSession.builder > .config("spark.sql.hive.metastorePartitionPruning",true) > .config("spark.sql.parquet.filterPushdown", true) > .config("spark.sql.hive.verifyPartitionPath", false) > .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) > .config("parquet.enable.summary-metadata",false) > .config("spark.sql.sources.partitionDiscovery.enabled",false) > .getOrCreate() > val df = > sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") > df.createOrReplaceTempView("temp_events") > sparkSession.sql( > """ > |select verb,count(*) from temp_events where event_date = > "2016-08-05" group by verb > """.stripMargin).show() > {code} > ruby code: > {code:ruby} > gem 'aws-sdk', '~> 2' > require 'aws-sdk' > client = Aws::S3::Client.new(:region=>'us-west-1') > next_continuation_token = nil > total = 0 > loop do > a= client.list_objects_v2({ > bucket: "bucket", # required > max_keys: 1000, > prefix: "events_v3/", > continuation_token: next_continuation_token , > fetch_owner: false, > }) > puts a.contents.last.key > total += a.contents.size > next_continuation_token = a.next_continuation_token > break unless a.is_truncated > end > puts "total" > puts total > {code} > tried looking into following bug: > https://issues.apache.org/jira/browse/HADOOP-12810 > but hadoop 2.7.3 doesn't solve that problem > stackoverflow reference: > http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11058) failed spark job reports on YARN as successful
[ https://issues.apache.org/jira/browse/SPARK-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219280#comment-16219280 ] Nick Dimiduk commented on SPARK-11058: -- This is a Spark issue though? Spark could fix the Spark API issue, right? Could it not be fixed in a new major/minor release? > failed spark job reports on YARN as successful > -- > > Key: SPARK-11058 > URL: https://issues.apache.org/jira/browse/SPARK-11058 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.0 > Environment: CDH 5.4 >Reporter: Lan Jiang >Priority: Minor > > I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in > “yarn-client” mode. The job itself failed due to YARN kills several executor > containers because the containers exceeded the memory limit posed by YARN. > However, when I went to the YARN resource manager site, it displayed the job > as successful. I found there was an issue reported in JIRA > https://issues.apache.org/jira/browse/SPARK-3627, but it says it was fixed in > Spark 1.2. On Spark history server, it shows the job as “Incomplete”. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11058) failed spark job reports on YARN as successful
[ https://issues.apache.org/jira/browse/SPARK-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218994#comment-16218994 ] Nick Dimiduk commented on SPARK-11058: -- [~srowen], [~vanzin] this is still considered a not-fix issue? I see the same behavior with Spark 2.1.0 on Yarn 2.7.3. Job submitted to the cluster in client-mode, killed with SIGTERM results in an application in Yarn's job history with "YarnApplicationState: FINISHED" and "FinalStatus Reported by AM: SUCCEEDED". Surely this is a bug. > failed spark job reports on YARN as successful > -- > > Key: SPARK-11058 > URL: https://issues.apache.org/jira/browse/SPARK-11058 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.0 > Environment: CDH 5.4 >Reporter: Lan Jiang >Priority: Minor > > I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in > “yarn-client” mode. The job itself failed due to YARN kills several executor > containers because the containers exceeded the memory limit posed by YARN. > However, when I went to the YARN resource manager site, it displayed the job > as successful. I found there was an issue reported in JIRA > https://issues.apache.org/jira/browse/SPARK-3627, but it says it was fixed in > Spark 1.2. On Spark history server, it shows the job as “Incomplete”. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875188#comment-15875188 ] Nick Dimiduk commented on SPARK-17636: -- I've proposed a PR for fixing this issue. Using [~maropu]'s suggested testing strategy via explain plans with parquet, with the proposed patch applied, I see the following Before: {noformat} scala> df.where($"b._1" === "").explain == Physical Plan == *Filter (b#1._1 = ) +- *FileScan parquet [a#0,b#1] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/ndimiduk/tmp/spark_19638], PartitionFilters: [], PushedFilters: [], ReadSchema: struct> {noformat} After: {noformat} scala> df.where($"b._1" === "").explain == Physical Plan == *Filter (b._1 = ) +- *FileScan parquet [a#283,b#284] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/ndimiduk/tmp/spark_19638], PartitionFilters: [], PushedFilters: [EqualTo(b._1,)], ReadSchema: struct> {noformat} > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3, 2.0.2 >Reporter: Mitesh >Priority: Minor > > There's a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {noformat} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields
[ https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875184#comment-15875184 ] Nick Dimiduk commented on SPARK-19638: -- Okay [~maropu], I'll continue discussion over there. > Filter pushdown not working for struct fields > - > > Key: SPARK-19638 > URL: https://issues.apache.org/jira/browse/SPARK-19638 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > Working with a dataset containing struct fields, and enabling debug logging > in the ES connector, I'm seeing the following behavior. The dataframe is > created over the ES connector and then the schema is extended with a couple > column aliases, such as. > {noformat} > df.withColumn("f2", df("foo")) > {noformat} > Queries vs those alias columns work as expected for fields that are > non-struct members. > {noformat} > scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show > 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters > [IsNotNull(foo),EqualTo(foo,1)] > 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL > [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}] > {noformat} > However, try the same with an alias over a struct field, and no filters are > pushed down. > {noformat} > scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == > '1'").limit(1).show > {noformat} > In fact, this is the case even when no alias is used at all. > {noformat} > scala> df.where("bar.baz == '1'").limit(1).show > {noformat} > Basically, pushdown for structs doesn't work at all. > Maybe this is specific to the ES connector? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17636) Parquet filter push down doesn't handle struct fields
[ https://issues.apache.org/jira/browse/SPARK-17636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875178#comment-15875178 ] Nick Dimiduk commented on SPARK-17636: -- Per [~maropu]'s comments on SPARK-19638, this issue is not limited to parquet. In my case, I'm seeing the same lack of struct filter pushdown while working with the ElasticSearch connector. > Parquet filter push down doesn't handle struct fields > - > > Key: SPARK-17636 > URL: https://issues.apache.org/jira/browse/SPARK-17636 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.2, 1.6.3, 2.0.2 >Reporter: Mitesh >Priority: Minor > > There's a *PushedFilters* for a simple numeric field, but not for a numeric > field inside a struct. Not sure if this is a Spark limitation because of > Parquet, or only a Spark limitation. > {noformat} > scala> hc.read.parquet("s3a://some/parquet/file").select("day_timestamp", > "sale_id") > res5: org.apache.spark.sql.DataFrame = [day_timestamp: > struct, sale_id: bigint] > scala> res5.filter("sale_id > 4").queryExecution.executedPlan > res9: org.apache.spark.sql.execution.SparkPlan = > Filter[23814] [args=(sale_id#86324L > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file, PushedFilters: [GreaterThan(sale_id,4)] > scala> res5.filter("day_timestamp.timestamp > 4").queryExecution.executedPlan > res10: org.apache.spark.sql.execution.SparkPlan = > Filter[23815] [args=(day_timestamp#86302.timestamp > > 4)][outPart=UnknownPartitioning(0)][outOrder=List()] > +- Scan ParquetRelation[day_timestamp#86302,sale_id#86324L] InputPaths: > s3a://some/parquet/file > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19615) Provide Dataset union convenience for divergent schema
[ https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875137#comment-15875137 ] Nick Dimiduk commented on SPARK-19615: -- Thanks for taking a look [~hyukjin.kwon]. These three bugs are indeed issues -- in all cases, it seems spark was not being careful to map column names to the appropriate column from each site of the union. My experience with 1.6.3 and 2.1.0 with unions has been much better. Actually, I still see echos of SPARK-9874 / SPARK-9813 when I extend one side or the other with null columns. I can file that as a separate issue if that's of interest to you. As for what RDBMS may or may not do, I'm not very aware or concerned. I'm thinking more about ease of use for a user. This is why I suggest perhaps a different union method that would encapsulate this behavior. Parsed spark sql can exhibit whatever semantics the community deems appropriate, while still giving users of the API access to this convenient functionality. I've implemented this logic in my application and it's quite complex. It would be very good for Spark to provide this for its users. > Provide Dataset union convenience for divergent schema > -- > > Key: SPARK-19615 > URL: https://issues.apache.org/jira/browse/SPARK-19615 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > Creating a union DataFrame over two sources that have different schema > definitions is surprisingly complex. Provide a version of the union method > that will create a infer a target schema as the result of merging the > sources. Automatically add extend either side with {{null}} columns for any > missing columns that are nullable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields
[ https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872413#comment-15872413 ] Nick Dimiduk commented on SPARK-19638: -- Debugging. I'm looking at the match expression in [{{DataSourceStrategy#translateFilter(Expression)}}|https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L509]. The predicate comes in as a {{EqualTo(GetStructField, Literal)}}. This doesn't match any of the cases. I was expecting it to step into the [{{case expressions.EqualTo(a: Attribute, Literal(v, t)) =>}}|https://github.com/apache/spark/blob/v2.1.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L511] on line 511 but execution steps past this point. Upon investigation, {{GetStructField}} does not extend {{Attribute}}. >From this point, the {{EqualTo}} condition involving the struct field is >dropped from the filter set pushed down to the ES connector. Thus I believe >this is an issue in Spark, not in the connector. > Filter pushdown not working for struct fields > - > > Key: SPARK-19638 > URL: https://issues.apache.org/jira/browse/SPARK-19638 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > Working with a dataset containing struct fields, and enabling debug logging > in the ES connector, I'm seeing the following behavior. The dataframe is > created over the ES connector and then the schema is extended with a couple > column aliases, such as. > {noformat} > df.withColumn("f2", df("foo")) > {noformat} > Queries vs those alias columns work as expected for fields that are > non-struct members. > {noformat} > scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show > 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters > [IsNotNull(foo),EqualTo(foo,1)] > 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL > [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}] > {noformat} > However, try the same with an alias over a struct field, and no filters are > pushed down. > {noformat} > scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == > '1'").limit(1).show > {noformat} > In fact, this is the case even when no alias is used at all. > {noformat} > scala> df.where("bar.baz == '1'").limit(1).show > {noformat} > Basically, pushdown for structs doesn't work at all. > Maybe this is specific to the ES connector? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19638) Filter pushdown not working for struct fields
[ https://issues.apache.org/jira/browse/SPARK-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872060#comment-15872060 ] Nick Dimiduk commented on SPARK-19638: -- [~maropu] I have placed a breakpoint in the ES connector's implementation of {{PrunedFilterScan#buildScan(Array[String], Array[Filter])}}. Here I see no filters for the struct columns. Indeed this is precisely where the log messages are produced. For this reason, I believe this to be an issue with Catalyst, not the connector. Perhaps you can guide me through further debugging? Thanks. > Filter pushdown not working for struct fields > - > > Key: SPARK-19638 > URL: https://issues.apache.org/jira/browse/SPARK-19638 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > Working with a dataset containing struct fields, and enabling debug logging > in the ES connector, I'm seeing the following behavior. The dataframe is > created over the ES connector and then the schema is extended with a couple > column aliases, such as. > {noformat} > df.withColumn("f2", df("foo")) > {noformat} > Queries vs those alias columns work as expected for fields that are > non-struct members. > {noformat} > scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show > 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters > [IsNotNull(foo),EqualTo(foo,1)] > 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL > [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}] > {noformat} > However, try the same with an alias over a struct field, and no filters are > pushed down. > {noformat} > scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == > '1'").limit(1).show > {noformat} > In fact, this is the case even when no alias is used at all. > {noformat} > scala> df.where("bar.baz == '1'").limit(1).show > {noformat} > Basically, pushdown for structs doesn't work at all. > Maybe this is specific to the ES connector? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19638) Filter pushdown not working for struct fields
Nick Dimiduk created SPARK-19638: Summary: Filter pushdown not working for struct fields Key: SPARK-19638 URL: https://issues.apache.org/jira/browse/SPARK-19638 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Nick Dimiduk Working with a dataset containing struct fields, and enabling debug logging in the ES connector, I'm seeing the following behavior. The dataframe is created over the ES connector and then the schema is extended with a couple column aliases, such as. {noformat} df.withColumn("f2", df("foo")) {noformat} Queries vs those alias columns work as expected for fields that are non-struct members. {noformat} scala> df.withColumn("f2", df("foo")).where("f2 == '1'").limit(0).show 17/02/16 15:06:49 DEBUG DataSource: Pushing down filters [IsNotNull(foo),EqualTo(foo,1)] 17/02/16 15:06:49 TRACE DataSource: Transformed filters into DSL [{"exists":{"field":"foo"}},{"match":{"foo":"1"}}] {noformat} However, try the same with an alias over a struct field, and no filters are pushed down. {noformat} scala> df.withColumn("bar_baz", df("bar.baz")).where("bar_baz == '1'").limit(1).show {noformat} In fact, this is the case even when no alias is used at all. {noformat} scala> df.where("bar.baz == '1'").limit(1).show {noformat} Basically, pushdown for structs doesn't work at all. Maybe this is specific to the ES connector? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19614) add type-preserving null function
[ https://issues.apache.org/jira/browse/SPARK-19614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870525#comment-15870525 ] Nick Dimiduk commented on SPARK-19614: -- {{lit(null).cast(type)}} does exactly what I needed. Thanks fellas. > add type-preserving null function > - > > Key: SPARK-19614 > URL: https://issues.apache.org/jira/browse/SPARK-19614 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Trivial > > There's currently no easy way to extend the columns of a DataFrame with null > columns that also preserves the type. {{lit(null)}} evaluates to > {{Literal(null, NullType)}}, despite any subsequent hinting, for instance > with {{Column.as(String, Metadata)}}. This comes up when programmatically > munging data from disparate sources. A function such as {{null(DataType)}} > would be nice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19614) add type-preserving null function
[ https://issues.apache.org/jira/browse/SPARK-19614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Dimiduk closed SPARK-19614. Resolution: Invalid > add type-preserving null function > - > > Key: SPARK-19614 > URL: https://issues.apache.org/jira/browse/SPARK-19614 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Trivial > > There's currently no easy way to extend the columns of a DataFrame with null > columns that also preserves the type. {{lit(null)}} evaluates to > {{Literal(null, NullType)}}, despite any subsequent hinting, for instance > with {{Column.as(String, Metadata)}}. This comes up when programmatically > munging data from disparate sources. A function such as {{null(DataType)}} > would be nice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19615) Provide Dataset union convenience for divergent schema
[ https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870513#comment-15870513 ] Nick Dimiduk commented on SPARK-19615: -- IMHO, a union operation should be as generous as possible. This facilitates common ETL and data cleansing operations where the sources are sparse-schema structures (JSON, HBase, Elastic Search, &c). A couple examples of what I mean. Given dataframes of type {noformat} root |-- a: string (nullable = false) |-- b: string (nullable = true) {noformat} and {noformat} root |-- a: string (nullable = false) |-- c: string (nullable = true) {noformat} I would expect the union operation to infer the nullable columns from both sides to produce a dataframe of type {noformat} root |-- a: string (nullable = false) |-- b: string (nullable = true) |-- c: string (nullable = true) {noformat} This should work on an arbitrarily deep nesting of structs, so {noformat} root |-- a: string (nullable = false) |-- b: struct (nullable = false) ||-- b1: string (nullable = true) ||-- b2: string (nullable = true) {noformat} unioned with {noformat} root |-- a: string (nullable = false) |-- b: struct (nullable = false) ||-- b3: string (nullable = true) ||-- b4: string (nullable = true) {noformat} would result in {noformat} root |-- a: string (nullable = false) |-- b: struct (nullable = false) ||-- b1: string (nullable = true) ||-- b2: string (nullable = true) ||-- b3: string (nullable = true) ||-- b4: string (nullable = true) {noformat} > Provide Dataset union convenience for divergent schema > -- > > Key: SPARK-19615 > URL: https://issues.apache.org/jira/browse/SPARK-19615 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Minor > > Creating a union DataFrame over two sources that have different schema > definitions is surprisingly complex. Provide a version of the union method > that will create a infer a target schema as the result of merging the > sources. Automatically add extend either side with {{null}} columns for any > missing columns that are nullable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19615) Provide Dataset union convenience for divergent schema
[ https://issues.apache.org/jira/browse/SPARK-19615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Dimiduk updated SPARK-19615: - Priority: Major (was: Minor) > Provide Dataset union convenience for divergent schema > -- > > Key: SPARK-19615 > URL: https://issues.apache.org/jira/browse/SPARK-19615 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > Creating a union DataFrame over two sources that have different schema > definitions is surprisingly complex. Provide a version of the union method > that will create a infer a target schema as the result of merging the > sources. Automatically add extend either side with {{null}} columns for any > missing columns that are nullable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19615) Provide Dataset union convenience for divergent schema
Nick Dimiduk created SPARK-19615: Summary: Provide Dataset union convenience for divergent schema Key: SPARK-19615 URL: https://issues.apache.org/jira/browse/SPARK-19615 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.0 Reporter: Nick Dimiduk Priority: Minor Creating a union DataFrame over two sources that have different schema definitions is surprisingly complex. Provide a version of the union method that will create a infer a target schema as the result of merging the sources. Automatically add extend either side with {{null}} columns for any missing columns that are nullable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19614) add type-preserving null function
Nick Dimiduk created SPARK-19614: Summary: add type-preserving null function Key: SPARK-19614 URL: https://issues.apache.org/jira/browse/SPARK-19614 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.0 Reporter: Nick Dimiduk Priority: Trivial There's currently no easy way to extend the columns of a DataFrame with null columns that also preserves the type. {{lit(null)}} evaluates to {{Literal(null, NullType)}}, despite any subsequent hinting, for instance with {{Column.as(String, Metadata)}}. This comes up when programmatically munging data from disparate sources. A function such as {{null(DataType)}} would be nice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan
[ https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868366#comment-15868366 ] Nick Dimiduk commented on SPARK-12957: -- Filed SPARK-19609. IMHO, it would be another subtask on this ticket. > Derive and propagate data constrains in logical plan > - > > Key: SPARK-12957 > URL: https://issues.apache.org/jira/browse/SPARK-12957 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Yin Huai >Assignee: Sameer Agarwal > Attachments: ConstraintPropagationinSparkSQL.pdf > > > Based on the semantic of a query plan, we can derive data constrains (e.g. if > a filter defines {{a > 10}}, we know that the output data of this filter > satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a > framework to derive and propagate constrains in the logical plan, which can > help us to build more advanced optimizations. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
Nick Dimiduk created SPARK-19609: Summary: Broadcast joins should pushdown join constraints as Filter to the larger relation Key: SPARK-19609 URL: https://issues.apache.org/jira/browse/SPARK-19609 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.1.0 Reporter: Nick Dimiduk For broadcast inner-joins, where the smaller relation is known to be small enough to materialize on a worker, the set of values for all join columns is known and fits in memory. Spark should translate these values into a {{Filter}} pushed down to the datasource. The common join condition of equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. An example of pushing such filters is already present in the form of {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. This optimization could even work when the smaller relation does not fit entirely in memory. This could be done by partitioning the smaller relation into N pieces, applying this predicate pushdown for each piece, and unioning the results. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866262#comment-15866262 ] Nick Dimiduk commented on SPARK-13219: -- I would implement this manually by materializing the smaller relation in the driver and then transforming those values into a filter applied to the larger. Frankly I expected this to be meaning of a broadcast join. I'm wondering if I'm doing something to prevent the planner from performing this optimization, so maybe the mailing list is a more appropriate place to discuss? > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan
[ https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866239#comment-15866239 ] Nick Dimiduk commented on SPARK-12957: -- [~sameerag] thanks for the comment. From a naive scan of the tickets, I believe I am seeing the benefits of SPARK-13871 in that a {{IsNotNull}} constraint is applied from the names of the join columns. However, I don't see the boon of SPARK-13789, specifically the {{a = 5, a = b}} mentioned in the description. My query is a join between a very small relation (100's of rows) and a very large one (10's of billions). I've hinted the planner to broadcast the smaller table, which it honors. After SPARK-13789, I expected to see the join column values pushed down as well. This is not the case. Any tips on debugging this further? I've set breakpoints in the {{RelationProvider}} implementation and see that it's only receiving the {{IsNotNull}} filters, nothing further from the planner. Thanks a lot! > Derive and propagate data constrains in logical plan > - > > Key: SPARK-12957 > URL: https://issues.apache.org/jira/browse/SPARK-12957 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Yin Huai >Assignee: Sameer Agarwal > Attachments: ConstraintPropagationinSparkSQL.pdf > > > Based on the semantic of a query plan, we can derive data constrains (e.g. if > a filter defines {{a > 10}}, we know that the output data of this filter > satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a > framework to derive and propagate constrains in the logical plan, which can > help us to build more advanced optimizations. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866210#comment-15866210 ] Nick Dimiduk commented on SPARK-13219: -- bq. The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables. [~velvia], [~smilegator], the above statement is an accurate reflection of this patch? I've upgraded to 2.1.0 in order to access this optimization. What about the case of a broadcast join -- will the values present in the smaller relation be pushed down to the scanner of the larger? I'm not seeing that behavior. Maybe this optimization is out of scope of this patch? I do see {{IsNotNull}} pushed down, applied the the columns involved in the join constraint, but I was expecting a but more. Thanks again for the patch! > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864798#comment-15864798 ] Nick Dimiduk commented on SPARK-13219: -- Now that all the subtasks on SPARK-12957 are resolved, where does that leave this feature? I'm trying to determine if I get this very useful enhancement by upgrading to 2.x. Thanks a lot! > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12957) Derive and propagate data constrains in logical plan
[ https://issues.apache.org/jira/browse/SPARK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864796#comment-15864796 ] Nick Dimiduk commented on SPARK-12957: -- I'm trying to understand the current state of SPARK-13219/SPARK-12532, which it seems are deferring to this issue. I see all subtasks here have been fixedFor in 2.0; is there further work to be done on this ticket? How does the sum of this work relate back to the predicate pushdown join optimization described in SPARK-13219. Basically, I'm trying to determine if I get this very useful enhancement by upgrading to 2.x. Thanks a lot! > Derive and propagate data constrains in logical plan > - > > Key: SPARK-12957 > URL: https://issues.apache.org/jira/browse/SPARK-12957 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Yin Huai >Assignee: Sameer Agarwal > Attachments: ConstraintPropagationinSparkSQL.pdf > > > Based on the semantic of a query plan, we can derive data constrains (e.g. if > a filter defines {{a > 10}}, we know that the output data of this filter > satisfy the constrain of {{a > 10}} and {{a is not null}}). We should build a > framework to derive and propagate constrains in the logical plan, which can > help us to build more advanced optimizations. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org