[jira] [Commented] (SPARK-36532) Deadlock in CoarseGrainedExecutorBackend.onDisconnected

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402071#comment-17402071
 ] 

Apache Spark commented on SPARK-36532:
--

User 'Ngone51' has created a pull request for this issue:
https://github.com/apache/spark/pull/33795

> Deadlock in CoarseGrainedExecutorBackend.onDisconnected
> ---
>
> Key: SPARK-36532
> URL: https://issues.apache.org/jira/browse/SPARK-36532
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.1.0, 3.2.0, 3.3.0
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.2.0
>
>
> The deadlock has the exactly same root cause as SPARK-14180 but just happens 
> in a different code path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang resolved SPARK-36547.

Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 33791
[https://github.com/apache/spark/pull/33791]

> Downgrade scala-maven-plugin to 4.3.0
> -
>
> Key: SPARK-36547
> URL: https://issues.apache.org/jira/browse/SPARK-36547
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.2.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> This is the same issue as SPARK-34007. We need to downgrade the version of 
> scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
> package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35083) Support remote scheduler pool file

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401980#comment-17401980
 ] 

Apache Spark commented on SPARK-35083:
--

User 'ulysses-you' has created a pull request for this issue:
https://github.com/apache/spark/pull/33794

> Support remote scheduler pool file
> --
>
> Key: SPARK-35083
> URL: https://issues.apache.org/jira/browse/SPARK-35083
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Assignee: XiDuo You
>Priority: Minor
> Fix For: 3.2.0
>
>
> Make `spark.scheduler.allocation.file` suport remote file. When using Spark 
> as a server (e.g. SparkThriftServer), it's hard for user specify a local path 
> as the scheduler pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36549) Add taskStatus support multiple value to doc

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36549:


Assignee: (was: Apache Spark)

> Add taskStatus support multiple value to doc
> 
>
> Key: SPARK-36549
> URL: https://issues.apache.org/jira/browse/SPARK-36549
> Project: Spark
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 3.2.0
>Reporter: angerszhu
>Priority: Major
>
> According to https://github.com/apache/spark/pull/31165#discussion_r692590472



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36549) Add taskStatus support multiple value to doc

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401973#comment-17401973
 ] 

Apache Spark commented on SPARK-36549:
--

User 'AngersZh' has created a pull request for this issue:
https://github.com/apache/spark/pull/33793

> Add taskStatus support multiple value to doc
> 
>
> Key: SPARK-36549
> URL: https://issues.apache.org/jira/browse/SPARK-36549
> Project: Spark
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 3.2.0
>Reporter: angerszhu
>Priority: Major
>
> According to https://github.com/apache/spark/pull/31165#discussion_r692590472



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36549) Add taskStatus support multiple value to doc

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36549:


Assignee: Apache Spark

> Add taskStatus support multiple value to doc
> 
>
> Key: SPARK-36549
> URL: https://issues.apache.org/jira/browse/SPARK-36549
> Project: Spark
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 3.2.0
>Reporter: angerszhu
>Assignee: Apache Spark
>Priority: Major
>
> According to https://github.com/apache/spark/pull/31165#discussion_r692590472



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36549) Add taskStatus support multiple value to doc

2021-08-19 Thread angerszhu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

angerszhu updated SPARK-36549:
--
Summary: Add taskStatus support multiple value to doc  (was: Improve doc 
about [SPARK-34092][SQL] Support Stage level restful api filter task details by 
task status)

> Add taskStatus support multiple value to doc
> 
>
> Key: SPARK-36549
> URL: https://issues.apache.org/jira/browse/SPARK-36549
> Project: Spark
>  Issue Type: Improvement
>  Components: docs
>Affects Versions: 3.2.0
>Reporter: angerszhu
>Priority: Major
>
> According to https://github.com/apache/spark/pull/31165#discussion_r692590472



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36549) Improve doc about [SPARK-34092][SQL] Support Stage level restful api filter task details by task status

2021-08-19 Thread angerszhu (Jira)
angerszhu created SPARK-36549:
-

 Summary: Improve doc about [SPARK-34092][SQL] Support Stage level 
restful api filter task details by task status
 Key: SPARK-36549
 URL: https://issues.apache.org/jira/browse/SPARK-36549
 Project: Spark
  Issue Type: Improvement
  Components: docs
Affects Versions: 3.2.0
Reporter: angerszhu


According to https://github.com/apache/spark/pull/31165#discussion_r692590472



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35933) PartitionFilters and pushFilters not applied to window functions

2021-08-19 Thread Pablo Langa Blanco (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401895#comment-17401895
 ] 

Pablo Langa Blanco commented on SPARK-35933:


can we close this ticket?

> PartitionFilters and pushFilters not applied to window functions
> 
>
> Key: SPARK-35933
> URL: https://issues.apache.org/jira/browse/SPARK-35933
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.8, 3.1.2
>Reporter: Shreyas Kothavade
>Priority: Major
>
> Spark does not apply partition and pushed filters when the partition by 
> column and window function partition columns are not the same. For example, 
> in the code below, the data frame is created with a partition on "id". And I 
> use the partitioned data frame to calculate lag which is partitioned by 
> "name". In this case, the query plan shows the partitionFilters and pushed 
> Filters as empty.
> {code:java}
> spark
>   .createDataFrame(
>     Seq(
>       Person(
>         1,
>         "Andy",
>         new Timestamp(1499955986039L),
>         new Timestamp(1499955982342L)
>       ),
>       Person(
>         2,
>         "Jeff",
>         new Timestamp(1499955986339L),
>         new Timestamp(149995598L)
>       )
>     )
>   )
>  .write
>   .partitionBy("id")
>   .mode(SaveMode.Append)
>   .parquet("spark-warehouse/people")
> val dfPeople =
>   spark.read.parquet("spark-warehouse/people")
> dfPeople
>   .select(
>     $"id",
>     $"name",
>     lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
>   )
>   .filter($"id" === 1)
>   .explain()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan

2021-08-19 Thread Gabe Church (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893
 ] 

Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM:


This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In 
example below it can take a 2hr query to a 2min query even with the listFiles 
via manual read. 

My workaround example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
 .").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))


was (Author: gchurch):
This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In 
example below it can take a 2hr query to a 2min query even with the listFiles 
via manual read. 

My example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
.").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> 
>
> Key: SPARK-32985
> URL: https://issues.apache.org/jira/browse/SPARK-32985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Cheng Su
>Assignee: Cheng Su
>Priority: Minor
> Fix For: 3.2.0
>
>
> As a followup from discussion in 
> [https://github.com/apache/spark/pull/29804#discussion_r493100510] . 
> Currently in data source v1 file scan `FileSourceScanExec`, bucket filter 
> pruning will only take effect with bucket table scan - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542]
>  . However this is unnecessary, as bucket filter pruning can also happen if 
> we disable bucketed table scan. This help query leverage the benefit from 
> bucket filter pruning to save CPU/IO to not read unnecessary bucket files, 
> and do not bound by bucket table scan when the parallelism of tasks is a 
> concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan

2021-08-19 Thread Gabe Church (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893
 ] 

Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM:


This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In my 
example below it takes a 2hr query on bucketed table to a 2min query even with 
the listFiles via manual read. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
 .").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))


was (Author: gchurch):
This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In 
example below it can take a 2hr query to a 2min query even with the listFiles 
via manual read. 

My workaround example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
 .").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> 
>
> Key: SPARK-32985
> URL: https://issues.apache.org/jira/browse/SPARK-32985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Cheng Su
>Assignee: Cheng Su
>Priority: Minor
> Fix For: 3.2.0
>
>
> As a followup from discussion in 
> [https://github.com/apache/spark/pull/29804#discussion_r493100510] . 
> Currently in data source v1 file scan `FileSourceScanExec`, bucket filter 
> pruning will only take effect with bucket table scan - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542]
>  . However this is unnecessary, as bucket filter pruning can also happen if 
> we disable bucketed table scan. This help query leverage the benefit from 
> bucket filter pruning to save CPU/IO to not read unnecessary bucket files, 
> and do not bound by bucket table scan when the parallelism of tasks is a 
> concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan

2021-08-19 Thread Gabe Church (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893
 ] 

Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:01 PM:


This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. In 
example below it can take a 2hr query to a 2min query even with the listFiles 
via manual read. 

My example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
 val target_partition = "2021-01-01"
 val bucket_target = "valuex"
 val bucket_col = "bucket_col"
 val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
 val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
 val sparkplan = df.queryExecution.executedPlan
 import org.apache.spark.sql.execution.FileSourceScanExec
 val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec 
}.get
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
 import org.apache.spark.sql.execution.datasources.FilePartition
 val bucket_files = for

{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }

yield s"$f".replaceAll("path: ", "").split(",")(0)
 val format = bucket_files(0).split("
.").last
 val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))


was (Author: gchurch):
This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. I've 
manually bypassed in my own work so that these bucketed tables are still 
useful, but this is absolutely huge improvement to be configurable so that we 
can gain from bucket-pruning. In my example it can take a 2hr query to a 2min 
query even with the listFiles via manual read. 

My example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get
import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
import org.apache.spark.sql.execution.datasources.FilePartition
val bucket_files = for {
FilePartition(bucketId, files) <- rdd.filePartitions
f <- files
} yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("\\.").last
val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> 
>
> Key: SPARK-32985
> URL: https://issues.apache.org/jira/browse/SPARK-32985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Cheng Su
>Assignee: Cheng Su
>Priority: Minor
> Fix For: 3.2.0
>
>
> As a followup from discussion in 
> [https://github.com/apache/spark/pull/29804#discussion_r493100510] . 
> Currently in data source v1 file scan `FileSourceScanExec`, bucket filter 
> pruning will only take effect with bucket table scan - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542]
>  . However this is unnecessary, as bucket filter pruning can also happen if 
> we disable bucketed table scan. This help query leverage the benefit from 
> bucket filter pruning to save CPU/IO to not read unnecessary bucket files, 
> and do not bound by bucket table scan when the parallelism of tasks is a 
> concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
T

[jira] [Commented] (SPARK-32985) Decouple bucket filter pruning and bucket table scan

2021-08-19 Thread Gabe Church (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893
 ] 

Gabe Church commented on SPARK-32985:
-

This is a very important addition, thank you! Many cases where bucket pruning 
can be used to optimize lookup. Specifically, if we lack the ability to split 
the read tasks across multiple spark workers for date-partitioned tables that 
have some high cardinality columns and growing number of files per bucket, 
bucketing becomes useless when we are stuck with one task per executor. I've 
manually bypassed in my own work so that these bucketed tables are still 
useful, but this is absolutely huge improvement to be configurable so that we 
can gain from bucket-pruning. In my example it can take a 2hr query to a 2min 
query even with the listFiles via manual read. 

My example is below, and only works for individual partition reads. 

val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"

import org.apache.spark.sql.functions.\{col, lit}
val df = 
spark.table(tablename).where((col(partition_col)===lit(target_partition)) && 
(col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get
import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
import org.apache.spark.sql.execution.datasources.FilePartition
val bucket_files = for {
FilePartition(bucketId, files) <- rdd.filePartitions
f <- files
} yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("\\.").last
val result_df = spark.read.option("mergeSchema", 
"False").format(format).load(bucket_files:_*).where(col(bucket_col) === 
lit(bucket_target))

> Decouple bucket filter pruning and bucket table scan
> 
>
> Key: SPARK-32985
> URL: https://issues.apache.org/jira/browse/SPARK-32985
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Cheng Su
>Assignee: Cheng Su
>Priority: Minor
> Fix For: 3.2.0
>
>
> As a followup from discussion in 
> [https://github.com/apache/spark/pull/29804#discussion_r493100510] . 
> Currently in data source v1 file scan `FileSourceScanExec`, bucket filter 
> pruning will only take effect with bucket table scan - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542]
>  . However this is unnecessary, as bucket filter pruning can also happen if 
> we disable bucketed table scan. This help query leverage the benefit from 
> bucket filter pruning to save CPU/IO to not read unnecessary bucket files, 
> and do not bound by bucket table scan when the parallelism of tasks is a 
> concern.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35312) Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401871#comment-17401871
 ] 

Apache Spark commented on SPARK-35312:
--

User 'xuanyuanking' has created a pull request for this issue:
https://github.com/apache/spark/pull/33792

> Introduce new Option in Kafka source to specify minimum number of records to 
> read per trigger
> -
>
> Key: SPARK-35312
> URL: https://issues.apache.org/jira/browse/SPARK-35312
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.1
>Reporter: Satish Gopalani
>Assignee: Satish Gopalani
>Priority: Major
> Fix For: 3.2.0
>
>
> Kafka source currently provides options to set the maximum number of offsets 
> to read per trigger.
> I will like to introduce a new option to specify the minimum number of 
> offsets to read per trigger i.e. *minOffsetsPerTrigger*.
> This new option will allow skipping trigger/batch when the number of records 
> available in Kafka is low. This is a very useful feature in cases where we 
> have a sudden burst of data at certain intervals in a day and data volume is 
> low for the rest of the day. Tunning such jobs is difficult as decreasing 
> trigger processing time increasing the number of batches and hence cluster 
> resource usage and adds to small file issues. Increasing trigger processing 
> time adds consumer lag. This will save cluster resources and also help solve 
> small file issues as it is running lesser batches.
>  Along with this, I would like to introduce '*maxTriggerDelay*' option which 
> will help to avoid cases of infinite delay in scheduling trigger and the 
> trigger will happen irrespective of records available if the maxTriggerDelay 
> time exceeds the last trigger. It would be an optional parameter with a 
> default value of 15 mins. _This option will be only applicable if 
> minOffsetsPerTrigger is set._
> *minOffsetsPerTrigger* option would be optional of course, but once specified 
> it would take precedence over *maxOffestsPerTrigger* which will be honored 
> only after *minOffsetsPerTrigger* is satisfied.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35312) Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401867#comment-17401867
 ] 

Apache Spark commented on SPARK-35312:
--

User 'xuanyuanking' has created a pull request for this issue:
https://github.com/apache/spark/pull/33792

> Introduce new Option in Kafka source to specify minimum number of records to 
> read per trigger
> -
>
> Key: SPARK-35312
> URL: https://issues.apache.org/jira/browse/SPARK-35312
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.1
>Reporter: Satish Gopalani
>Assignee: Satish Gopalani
>Priority: Major
> Fix For: 3.2.0
>
>
> Kafka source currently provides options to set the maximum number of offsets 
> to read per trigger.
> I will like to introduce a new option to specify the minimum number of 
> offsets to read per trigger i.e. *minOffsetsPerTrigger*.
> This new option will allow skipping trigger/batch when the number of records 
> available in Kafka is low. This is a very useful feature in cases where we 
> have a sudden burst of data at certain intervals in a day and data volume is 
> low for the rest of the day. Tunning such jobs is difficult as decreasing 
> trigger processing time increasing the number of batches and hence cluster 
> resource usage and adds to small file issues. Increasing trigger processing 
> time adds consumer lag. This will save cluster resources and also help solve 
> small file issues as it is running lesser batches.
>  Along with this, I would like to introduce '*maxTriggerDelay*' option which 
> will help to avoid cases of infinite delay in scheduling trigger and the 
> trigger will happen irrespective of records available if the maxTriggerDelay 
> time exceeds the last trigger. It would be an optional parameter with a 
> default value of 15 mins. _This option will be only applicable if 
> minOffsetsPerTrigger is set._
> *minOffsetsPerTrigger* option would be optional of course, but once specified 
> it would take precedence over *maxOffestsPerTrigger* which will be honored 
> only after *minOffsetsPerTrigger* is satisfied.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36548) Throwing NoClassDefFoundError for Logging$class

2021-08-19 Thread sadagopan kalyanaraman (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sadagopan kalyanaraman updated SPARK-36548:
---
Description: 
I'm getting NoClassDefFoundError. is it removed in 3.1.1?

 

https://spark.apache.org/docs/3.1.1/api/java/org/apache/spark/internal/Logging.html

 

*https://spark.apache.org/docs/2.4.7/api/java/org/apache/spark/internal/Logging.html*

 

Exception in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be 
instantiatedException in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be 
instantiated at java.base/java.util.ServiceLoader.fail(Unknown Source) at 
java.base/java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) at 
java.base/java.util.ServiceLoader$ProviderImpl.get(Unknown Source) at 
java.base/java.util.ServiceLoader$3.next(Unknown Source) at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44) at 
scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659)
 at 
org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:209)
 at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195)
 at 
com.xyz.xyz.xyz.AbstractCdcMessageToSqJob.runJob(AbstractCdcMessageToSqJob.java:94)
 at 
com.xyz.xyz.xyz.CdcMessageToSqlBillerJob.main(CdcMessageToSqlBillerJob.java:30) 
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class at 
com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala:34)
 at 
com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala) 
at 
com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:43)
 at 
com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:50)
 at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method) at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
 Source) at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) 
... 33 moreCaused by: java.lang.ClassNotFoundException: 
org.apache.spark.internal.Logging$class at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 41 more

  was:
I'm getting NoClassDefFoundError. is it removed in 3.1.1?

 

Exception in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be 
instantiatedException in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.sp

[jira] [Created] (SPARK-36548) Throwing NoClassDefFoundError for Logging$class

2021-08-19 Thread sadagopan kalyanaraman (Jira)
sadagopan kalyanaraman created SPARK-36548:
--

 Summary: Throwing NoClassDefFoundError for Logging$class
 Key: SPARK-36548
 URL: https://issues.apache.org/jira/browse/SPARK-36548
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes, Spark Core
Affects Versions: 3.1.1
Reporter: sadagopan kalyanaraman


I'm getting NoClassDefFoundError. is it removed in 3.1.1?

 

Exception in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be 
instantiatedException in thread "main" java.util.ServiceConfigurationError: 
org.apache.spark.sql.sources.DataSourceRegister: Provider 
com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be 
instantiated at java.base/java.util.ServiceLoader.fail(Unknown Source) at 
java.base/java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) at 
java.base/java.util.ServiceLoader$ProviderImpl.get(Unknown Source) at 
java.base/java.util.ServiceLoader$3.next(Unknown Source) at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44) at 
scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
scala.collection.TraversableLike.filter(TraversableLike.scala:347) at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at 
scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659)
 at 
org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:209)
 at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195)
 at 
com.xyz.xyz.xyz.AbstractCdcMessageToSqJob.runJob(AbstractCdcMessageToSqJob.java:94)
 at 
com.xyz.xyz.xyz.CdcMessageToSqlBillerJob.main(CdcMessageToSqlBillerJob.java:30) 
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class at 
com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala:34)
 at 
com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala) 
at 
com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:43)
 at 
com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:50)
 at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method) at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
 Source) at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) 
... 33 moreCaused by: java.lang.ClassNotFoundException: 
org.apache.spark.internal.Logging$class at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 41 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36547:


Assignee: Gengliang Wang  (was: Apache Spark)

> Downgrade scala-maven-plugin to 4.3.0
> -
>
> Key: SPARK-36547
> URL: https://issues.apache.org/jira/browse/SPARK-36547
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.2.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Blocker
>
> This is the same issue as SPARK-34007. We need to downgrade the version of 
> scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
> package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401809#comment-17401809
 ] 

Apache Spark commented on SPARK-36547:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33791

> Downgrade scala-maven-plugin to 4.3.0
> -
>
> Key: SPARK-36547
> URL: https://issues.apache.org/jira/browse/SPARK-36547
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.2.0
>Reporter: Gengliang Wang
>Assignee: Apache Spark
>Priority: Blocker
>
> This is the same issue as SPARK-34007. We need to downgrade the version of 
> scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
> package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401808#comment-17401808
 ] 

Apache Spark commented on SPARK-36547:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33791

> Downgrade scala-maven-plugin to 4.3.0
> -
>
> Key: SPARK-36547
> URL: https://issues.apache.org/jira/browse/SPARK-36547
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.2.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Blocker
>
> This is the same issue as SPARK-34007. We need to downgrade the version of 
> scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
> package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36547:


Assignee: Apache Spark  (was: Gengliang Wang)

> Downgrade scala-maven-plugin to 4.3.0
> -
>
> Key: SPARK-36547
> URL: https://issues.apache.org/jira/browse/SPARK-36547
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.2.0
>Reporter: Gengliang Wang
>Assignee: Apache Spark
>Priority: Blocker
>
> This is the same issue as SPARK-34007. We need to downgrade the version of 
> scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
> package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0

2021-08-19 Thread Gengliang Wang (Jira)
Gengliang Wang created SPARK-36547:
--

 Summary: Downgrade scala-maven-plugin to 4.3.0
 Key: SPARK-36547
 URL: https://issues.apache.org/jira/browse/SPARK-36547
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 3.2.0
Reporter: Gengliang Wang
Assignee: Gengliang Wang


This is the same issue as SPARK-34007. We need to downgrade the version of 
scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 
package



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34305) Unify v1 and v2 ALTER TABLE .. SET SERDE tests

2021-08-19 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-34305:
---
Fix Version/s: (was: 3.2.0)
   3.3.0

> Unify v1 and v2 ALTER TABLE .. SET SERDE tests
> --
>
> Key: SPARK-34305
> URL: https://issues.apache.org/jira/browse/SPARK-34305
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Max Gekk
>Assignee: Max Gekk
>Priority: Major
> Fix For: 3.3.0
>
>
> Extract ALTER TABLE .. SET SERDE tests to the common place to run them for V1 
> and v2 datasources. Some tests can be places to V1 and V2 specific test 
> suites.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34332) Unify v1 and v2 ALTER TABLE .. SET LOCATION tests

2021-08-19 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-34332:
---
Fix Version/s: (was: 3.2.0)
   3.3.0

> Unify v1 and v2 ALTER TABLE .. SET LOCATION tests
> -
>
> Key: SPARK-34332
> URL: https://issues.apache.org/jira/browse/SPARK-34332
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Max Gekk
>Assignee: Max Gekk
>Priority: Major
> Fix For: 3.3.0
>
>
> Extract ALTER TABLE .. SET LOCATION tests to the common place to run them for 
> V1 and v2 datasources. Some tests can be places to V1 and V2 specific test 
> suites.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36082) when the right side is small enough to use SingleColumn Null Aware Anti Join

2021-08-19 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-36082:
---
Fix Version/s: (was: 3.2.0)
   3.3.0

> when the right side is small enough to use SingleColumn Null Aware Anti Join
> 
>
> Key: SPARK-36082
> URL: https://issues.apache.org/jira/browse/SPARK-36082
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0, 3.1.3
>Reporter: mcdull_zhang
>Priority: Minor
> Fix For: 3.3.0
>
>
> NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) will 
> build right side into a HashMap.
> code in SparkStrategy:
>  
> {code:java}
> case j @ ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
>   Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, BuildRight,
> None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = 
> true)){code}
> we should add the conditions and use this optimization when the size of the 
> right side is small enough.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34884) Improve dynamic partition pruning evaluation

2021-08-19 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang resolved SPARK-34884.

Resolution: Fixed

> Improve dynamic partition pruning evaluation
> 
>
> Key: SPARK-34884
> URL: https://issues.apache.org/jira/browse/SPARK-34884
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.2.0
>
>
> Fast fail if filtering side can not build broadcast by size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35083) Support remote scheduler pool file

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401686#comment-17401686
 ] 

Apache Spark commented on SPARK-35083:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33789

> Support remote scheduler pool file
> --
>
> Key: SPARK-35083
> URL: https://issues.apache.org/jira/browse/SPARK-35083
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Assignee: XiDuo You
>Priority: Minor
> Fix For: 3.2.0
>
>
> Make `spark.scheduler.allocation.file` suport remote file. When using Spark 
> as a server (e.g. SparkThriftServer), it's hard for user specify a local path 
> as the scheduler pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35083) Support remote scheduler pool file

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401685#comment-17401685
 ] 

Apache Spark commented on SPARK-35083:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33789

> Support remote scheduler pool file
> --
>
> Key: SPARK-35083
> URL: https://issues.apache.org/jira/browse/SPARK-35083
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Assignee: XiDuo You
>Priority: Minor
> Fix For: 3.2.0
>
>
> Make `spark.scheduler.allocation.file` suport remote file. When using Spark 
> as a server (e.g. SparkThriftServer), it's hard for user specify a local path 
> as the scheduler pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns

2021-08-19 Thread Vishal Dhavale (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Dhavale updated SPARK-36546:
---
Description: 
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- author: string (nullable = true)
 |||-- pages: integer (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema()
scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- author: string (nullable = true)
 |||-- pages: integer (nullable = true)
 |||-- new_column: string (nullable = true){code}
 

Step3:  Merge arrayStructDf1 and arrayStructDf2 using unionByName

We see the error org.apache.spark.sql.AnalysisException: Union can only be 
performed on tables with the compatible column types. 
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

 

  was:
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- author: string (nullable = true)
 |||-- pages: integer (nullable = true)
 |||-- new_column: string (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"

val arrayStructSchemaNewClm = new StructType().add("name",

[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns

2021-08-19 Thread Vishal Dhavale (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Dhavale updated SPARK-36546:
---
Description: 
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- author: string (nullable = true)
 |||-- pages: integer (nullable = true)
 |||-- new_column: string (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema(){code}
 

We see the below error when we try to use unionByName arrayStructDf1 and 
arrayStructDf2
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

 

  was:
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema(){code}
 

We see the below error when we try to use unionByName arrayStructDf1 and 
arrayStructDf2
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksInt

[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

handong updated SPARK-36545:

Description: 
 

i have a spark streaming application with kafka . 
 Here are the parameters:

kafka partition = 500

batch time = 60 

--conf spark.streaming.backpressure.enabled=true

--conf spark.streaming.kafka.maxRatePerPartition=2500

input size= 500 * 120 * 2500 = 75,000,000

however input size become 16000 after some batch 

who can tell me reason

!kafka.png!

 

  was:
!kafka.png!

 


> sparkstreaming input rate exceed  spark.streaming.kafka.maxRatePerPartition
> ---
>
> Key: SPARK-36545
> URL: https://issues.apache.org/jira/browse/SPARK-36545
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Submit, YARN
>Affects Versions: 2.4.5
>Reporter: handong
>Priority: Major
> Attachments: kafka.png
>
>
>  
> i have a spark streaming application with kafka . 
>  Here are the parameters:
> kafka partition = 500
> batch time = 60 
> --conf spark.streaming.backpressure.enabled=true
> --conf spark.streaming.kafka.maxRatePerPartition=2500
> input size= 500 * 120 * 2500 = 75,000,000
> however input size become 16000 after some batch 
> who can tell me reason
> !kafka.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

handong updated SPARK-36545:

Description: 
!kafka.png!

 

  was:
!kafka.png! i have a spark streaming application with kafka . 
 Here are the parameters:

kafka partition = 500

batch time = 60 

--conf spark.streaming.backpressure.enabled=true

--conf spark.streaming.kafka.maxRatePerPartition=2500

input size= 500 * 120 * 2500 = 75,000,000

however input size become 16000 after some batch 

 


> sparkstreaming input rate exceed  spark.streaming.kafka.maxRatePerPartition
> ---
>
> Key: SPARK-36545
> URL: https://issues.apache.org/jira/browse/SPARK-36545
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Submit, YARN
>Affects Versions: 2.4.5
>Reporter: handong
>Priority: Major
> Attachments: kafka.png
>
>
> !kafka.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

handong updated SPARK-36545:

Description: 
!kafka.png! i have a spark streaming application with kafka . 
 Here are the parameters:

kafka partition = 500

batch time = 60 

--conf spark.streaming.backpressure.enabled=true

--conf spark.streaming.kafka.maxRatePerPartition=2500

input size= 500 * 120 * 2500 = 75,000,000

however input size become 16000 after some batch 

 

  was:
i have a spark streaming application with kafka . 
Here are the parameters:

kafka partition = 500

batch time = 60 

--conf spark.streaming.backpressure.enabled=true

--conf spark.streaming.kafka.maxRatePerPartition=2500

input size= 500 * 120 * 2500 = 75,000,000

however input size become 16000 after some batch 

 


> sparkstreaming input rate exceed  spark.streaming.kafka.maxRatePerPartition
> ---
>
> Key: SPARK-36545
> URL: https://issues.apache.org/jira/browse/SPARK-36545
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Submit, YARN
>Affects Versions: 2.4.5
>Reporter: handong
>Priority: Major
> Attachments: kafka.png
>
>
> !kafka.png! i have a spark streaming application with kafka . 
>  Here are the parameters:
> kafka partition = 500
> batch time = 60 
> --conf spark.streaming.backpressure.enabled=true
> --conf spark.streaming.kafka.maxRatePerPartition=2500
> input size= 500 * 120 * 2500 = 75,000,000
> however input size become 16000 after some batch 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

handong updated SPARK-36545:

Attachment: kafka.png

> sparkstreaming input rate exceed  spark.streaming.kafka.maxRatePerPartition
> ---
>
> Key: SPARK-36545
> URL: https://issues.apache.org/jira/browse/SPARK-36545
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Submit, YARN
>Affects Versions: 2.4.5
>Reporter: handong
>Priority: Major
> Attachments: kafka.png
>
>
> i have a spark streaming application with kafka . 
> Here are the parameters:
> kafka partition = 500
> batch time = 60 
> --conf spark.streaming.backpressure.enabled=true
> --conf spark.streaming.kafka.maxRatePerPartition=2500
> input size= 500 * 120 * 2500 = 75,000,000
> however input size become 16000 after some batch 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns

2021-08-19 Thread Vishal Dhavale (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Dhavale updated SPARK-36546:
---
Description: 
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema(){code}
 

We see the below error when we try to use unionByName arrayStructDf1 and 
arrayStructDf2
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

 

  was:
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500
 val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))
 val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
arrayStructDf1.printSchema() 
scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- 
booksIntersted: array (nullable = true) |    |-- element: struct (containsNull 
= true) |    |    |-- name: string (nullable = true) |    |    |-- author: 
string (nullable = true) |    |    |-- pages: integer (nullable = true) |    |  
  |-- new_column: string (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
 val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"
 val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))
 val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)
 arrayStructDf2.printSchema()

scala> arrayStructDf2.printSchema()
 root
|– name: string (nullable = true)|
|– booksIntersted: array (nullable = true)|
|   |– element: struct (containsNull = true)|
|   |   |– name: string (nullable = true)|
|   |   |– author: string (nullable = true)|
|   |   |– pages: integer (nullable = true)|
|   |   |– new_column: string (nullable = true)
 Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName|

 
{code:java}
scala> arrayStructDf1.unionByName(arraySt

[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

handong updated SPARK-36545:

Component/s: YARN
 Spark Submit
Description: 
i have a spark streaming application with kafka . 
Here are the parameters:

kafka partition = 500

batch time = 60 

--conf spark.streaming.backpressure.enabled=true

--conf spark.streaming.kafka.maxRatePerPartition=2500

input size= 500 * 120 * 2500 = 75,000,000

however input size become 16000 after some batch 

 

> sparkstreaming input rate exceed  spark.streaming.kafka.maxRatePerPartition
> ---
>
> Key: SPARK-36545
> URL: https://issues.apache.org/jira/browse/SPARK-36545
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Spark Submit, YARN
>Affects Versions: 2.4.5
>Reporter: handong
>Priority: Major
>
> i have a spark streaming application with kafka . 
> Here are the parameters:
> kafka partition = 500
> batch time = 60 
> --conf spark.streaming.backpressure.enabled=true
> --conf spark.streaming.kafka.maxRatePerPartition=2500
> input size= 500 * 120 * 2500 = 75,000,000
> however input size become 16000 after some batch 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns

2021-08-19 Thread Vishal Dhavale (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Dhavale updated SPARK-36546:
---
Description: 
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500
 val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))
 val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
arrayStructDf1.printSchema() 
scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- 
booksIntersted: array (nullable = true) |    |-- element: struct (containsNull 
= true) |    |    |-- name: string (nullable = true) |    |    |-- author: 
string (nullable = true) |    |    |-- pages: integer (nullable = true) |    |  
  |-- new_column: string (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
 val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"
 val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))
 val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)
 arrayStructDf2.printSchema()

scala> arrayStructDf2.printSchema()
 root
|– name: string (nullable = true)|
|– booksIntersted: array (nullable = true)|
|   |– element: struct (containsNull = true)|
|   |   |– name: string (nullable = true)|
|   |   |– author: string (nullable = true)|
|   |   |– pages: integer (nullable = true)|
|   |   |– new_column: string (nullable = true)
 Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName|

 
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)
org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

https://issues.apache.org/jira/browse/SPARK-32376

 

  was:
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
val arrayStructData = Seq(
  Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
  Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500
val arrayStructSchema = new StructType().add("name",StringType)
  .add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))
val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema()

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)



 

Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
val arrayStructData2 = Seq(
  
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
  
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"
val arrayStructSchemaNewClm = new StructType().add("name",StringType)
  .add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)

[jira] [Created] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns

2021-08-19 Thread Vishal Dhavale (Jira)
Vishal Dhavale created SPARK-36546:
--

 Summary: Make unionByName null-filling behavior work with array of 
struct columns
 Key: SPARK-36546
 URL: https://issues.apache.org/jira/browse/SPARK-36546
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.1.1
Reporter: Vishal Dhavale


Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
val arrayStructData = Seq(
  Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
  Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500
val arrayStructSchema = new StructType().add("name",StringType)
  .add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)))
val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema()

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)



 

Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
val arrayStructData2 = Seq(
  
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
  
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"
val arrayStructSchemaNewClm = new StructType().add("name",StringType)
  .add("booksIntersted",ArrayType(new StructType()
.add("name",StringType)
.add("author",StringType)
.add("pages",IntegerType)
 .add("new_column",StringType)))
val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)
arrayStructDf2.printSchema()

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)
 |    |    |-- new_column: string (nullable = true)
Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName

 
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)
org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array> <> 
array> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

https://issues.apache.org/jira/browse/SPARK-32376

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition

2021-08-19 Thread handong (Jira)
handong created SPARK-36545:
---

 Summary: sparkstreaming input rate exceed  
spark.streaming.kafka.maxRatePerPartition
 Key: SPARK-36545
 URL: https://issues.apache.org/jira/browse/SPARK-36545
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 2.4.5
Reporter: handong






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36428) the 'seconds' parameter of 'make_timestamp' should accept integer type

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401598#comment-17401598
 ] 

Apache Spark commented on SPARK-36428:
--

User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/33787

> the 'seconds' parameter of 'make_timestamp' should accept integer type
> --
>
> Key: SPARK-36428
> URL: https://issues.apache.org/jira/browse/SPARK-36428
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
> Fix For: 3.2.0
>
>
> With ANSI mode, {{SELECT make_timestamp(1, 1, 1, 1, 1, 1)}} fails, because 
> the 'seconds' parameter needs to be of type DECIMAL(8,6), and INT can't be 
> implicitly casted to DECIMAL(8,6) under ANSI mode.
> We should update the function {{make_timestamp}} to allow integer type 
> 'seconds' parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36428) the 'seconds' parameter of 'make_timestamp' should accept integer type

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401596#comment-17401596
 ] 

Apache Spark commented on SPARK-36428:
--

User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/33787

> the 'seconds' parameter of 'make_timestamp' should accept integer type
> --
>
> Key: SPARK-36428
> URL: https://issues.apache.org/jira/browse/SPARK-36428
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
> Fix For: 3.2.0
>
>
> With ANSI mode, {{SELECT make_timestamp(1, 1, 1, 1, 1, 1)}} fails, because 
> the 'seconds' parameter needs to be of type DECIMAL(8,6), and INT can't be 
> implicitly casted to DECIMAL(8,6) under ANSI mode.
> We should update the function {{make_timestamp}} to allow integer type 
> 'seconds' parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36538) Environment variables part in config doc isn't properly documented.

2021-08-19 Thread Yuto Akutsu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Akutsu resolved SPARK-36538.
-
Resolution: Invalid

> Environment variables part in config doc isn't properly documented.
> ---
>
> Key: SPARK-36538
> URL: https://issues.apache.org/jira/browse/SPARK-36538
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 3.1.2
>Reporter: Yuto Akutsu
>Priority: Major
>
> It says environment variables are not reflected through spark-env.sh in YARN 
> cluster mode but I believe they are. I think this part of the document should 
> be removed.
> [https://spark.apache.org/docs/latest/configuration.html#environment-variables]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36541) Images used in pyspark documentation still use Koalas

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36541:


Assignee: Apache Spark

> Images used in pyspark documentation still use Koalas
> -
>
> Key: SPARK-36541
> URL: https://issues.apache.org/jira/browse/SPARK-36541
> Project: Spark
>  Issue Type: Documentation
>  Components: docs, PySpark
>Affects Versions: 3.2.0
>Reporter: Leona Yoda
>Assignee: Apache Spark
>Priority: Minor
> Attachments: images_replaced.key
>
>
> Images in [Transform and apply a 
> function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst]
>  documentation still uses the word Koalas, althogh the word was replaced to 
> panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] .
> I think we have to match the word on that images
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36541) Images used in pyspark documentation still use Koalas

2021-08-19 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36541:


Assignee: (was: Apache Spark)

> Images used in pyspark documentation still use Koalas
> -
>
> Key: SPARK-36541
> URL: https://issues.apache.org/jira/browse/SPARK-36541
> Project: Spark
>  Issue Type: Documentation
>  Components: docs, PySpark
>Affects Versions: 3.2.0
>Reporter: Leona Yoda
>Priority: Minor
> Attachments: images_replaced.key
>
>
> Images in [Transform and apply a 
> function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst]
>  documentation still uses the word Koalas, althogh the word was replaced to 
> panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] .
> I think we have to match the word on that images
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36541) Images used in pyspark documentation still use Koalas

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401567#comment-17401567
 ] 

Apache Spark commented on SPARK-36541:
--

User 'yoda-mon' has created a pull request for this issue:
https://github.com/apache/spark/pull/33786

> Images used in pyspark documentation still use Koalas
> -
>
> Key: SPARK-36541
> URL: https://issues.apache.org/jira/browse/SPARK-36541
> Project: Spark
>  Issue Type: Documentation
>  Components: docs, PySpark
>Affects Versions: 3.2.0
>Reporter: Leona Yoda
>Priority: Minor
> Attachments: images_replaced.key
>
>
> Images in [Transform and apply a 
> function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst]
>  documentation still uses the word Koalas, althogh the word was replaced to 
> panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] .
> I think we have to match the word on that images
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning

2021-08-19 Thread Yuming Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-36444:

Affects Version/s: 3.0.3
   3.1.2

> Remove OptimizeSubqueries from batch of PartitionPruning
> 
>
> Key: SPARK-36444
> URL: https://issues.apache.org/jira/browse/SPARK-36444
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.3, 3.1.2, 3.2.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.2.0
>
>
> To support this case:
> {code:scala}
> sql(
> """
>   |SELECT date_id, product_id FROM fact_sk f
>   |JOIN (select store_id + 3 as new_store_id from dim_store where 
> country = 'US') s
>   |ON f.store_id = s.new_store_id
> """.stripMargin)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning

2021-08-19 Thread Yuming Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-36444:

Affects Version/s: (was: 3.3.0)
   3.2.0

> Remove OptimizeSubqueries from batch of PartitionPruning
> 
>
> Key: SPARK-36444
> URL: https://issues.apache.org/jira/browse/SPARK-36444
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.2.0
>
>
> To support this case:
> {code:scala}
> sql(
> """
>   |SELECT date_id, product_id FROM fact_sk f
>   |JOIN (select store_id + 3 as new_store_id from dim_store where 
> country = 'US') s
>   |ON f.store_id = s.new_store_id
> """.stripMargin)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning

2021-08-19 Thread Yuming Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang resolved SPARK-36444.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 33664
[https://github.com/apache/spark/pull/33664]

> Remove OptimizeSubqueries from batch of PartitionPruning
> 
>
> Key: SPARK-36444
> URL: https://issues.apache.org/jira/browse/SPARK-36444
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.2.0
>
>
> To support this case:
> {code:scala}
> sql(
> """
>   |SELECT date_id, product_id FROM fact_sk f
>   |JOIN (select store_id + 3 as new_store_id from dim_store where 
> country = 'US') s
>   |ON f.store_id = s.new_store_id
> """.stripMargin)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning

2021-08-19 Thread Yuming Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang reassigned SPARK-36444:
---

Assignee: Yuming Wang

> Remove OptimizeSubqueries from batch of PartitionPruning
> 
>
> Key: SPARK-36444
> URL: https://issues.apache.org/jira/browse/SPARK-36444
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
>
> To support this case:
> {code:scala}
> sql(
> """
>   |SELECT date_id, product_id FROM fact_sk f
>   |JOIN (select store_id + 3 as new_store_id from dim_store where 
> country = 'US') s
>   |ON f.store_id = s.new_store_id
> """.stripMargin)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-19 Thread Saurabh Chawla (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514
 ] 

Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:56 AM:
--

[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) is needed for lower version 
of hadoop. This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe it's better to get DecommissioningTimeout from RM rather than doing 
it here in Spark side through some confs. To get the correct behaviour of 
graceful decommissioning working in spark, It's better to use  hadoop-3.0.1 or 
later version of hadoop.


was (Author: saurabhc100):
[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe it's better to get DecommissioningTimeout from RM rather than doing 
it here in Spark side through some confs. To get the correct behaviour of 
graceful decommissioning working in spark, It's better to use  hadoop-3.0.1 or 
later version of hadoop.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled

[jira] [Commented] (SPARK-35083) Support remote scheduler pool file

2021-08-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401517#comment-17401517
 ] 

Apache Spark commented on SPARK-35083:
--

User 'Ngone51' has created a pull request for this issue:
https://github.com/apache/spark/pull/33785

> Support remote scheduler pool file
> --
>
> Key: SPARK-35083
> URL: https://issues.apache.org/jira/browse/SPARK-35083
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: XiDuo You
>Assignee: XiDuo You
>Priority: Minor
> Fix For: 3.2.0
>
>
> Make `spark.scheduler.allocation.file` suport remote file. When using Spark 
> as a server (e.g. SparkThriftServer), it's hard for user specify a local path 
> as the scheduler pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-19 Thread Saurabh Chawla (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514
 ] 

Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:46 AM:
--

[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe it's better to get DecommissioningTimeout from RM rather than doing 
it here in Spark side through some confs. To get the correct behaviour of 
graceful decommissioning working in spark, It's better to use  hadoop-3.0.1 or 
later version of hadoop.


was (Author: saurabhc100):
[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe its good to get DecommissioningTimeout from RM rather than doing it 
here in Spark side through some confs. To get the correct behaviour of graceful 
decommissioning working in spark. It's better to use  hadoop-3.0.1 or later 
version of hadoop.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple tim

[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-19 Thread Saurabh Chawla (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514
 ] 

Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:35 AM:
--

[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe its good to get DecommissioningTimeout from RM rather than doing it 
here in Spark side through some confs. To get the correct behaviour of graceful 
decommissioning working in spark. It's better to use  hadoop-3.0.1 or later 
version of hadoop.


was (Author: saurabhc100):
[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe its good to get DecommissioningTimeout from RM rather than doing it 
here in Spark side through some confs. To get the correct behaviour of graceful 
decommissioning working in spark. It's better to use  hadoop-3.0.1 or later 
version of hadoop.

 

{{}}

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multip

[jira] [Commented] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-19 Thread Saurabh Chawla (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514
 ] 

Saurabh Chawla commented on SPARK-30873:


[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe its good to get DecommissioningTimeout from RM rather than doing it 
here in Spark side through some confs. To get the correct behaviour of graceful 
decommissioning working in spark. It's better to use  hadoop-3.0.1 or later 
version of hadoop.

 

{{}}

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple times to recompute 
> missing shuffles from all nodes, and prevent jobs from being stuck for hours 
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of 
> decommissioning of node. In Spark there is number of consecutive stage 
> attempts allowed before a stage is aborted.This is controlled by the config 
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
> decommissioning of nodes towards stage failure improves the reliability of 
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master 
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from 
> the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action 
> needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracke

[jira] [Resolved] (SPARK-36519) Store the RocksDB format in the checkpoint for a streaming query

2021-08-19 Thread L. C. Hsieh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

L. C. Hsieh resolved SPARK-36519.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 33749
[https://github.com/apache/spark/pull/33749]

> Store the RocksDB format in the checkpoint for a streaming query
> 
>
> Key: SPARK-36519
> URL: https://issues.apache.org/jira/browse/SPARK-36519
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 3.2.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Major
> Fix For: 3.2.0
>
>
> RocksDB provides backward compatibility but it doesn't always provide forward 
> compatibility. It's better to store the RocksDB format version in the 
> checkpoint so that it would give us more information to provide the rollback 
> guarantee when we upgrade the RocksDB version that may introduce incompatible 
> change in a new Spark version.
> A typical case is when a user upgrades their query to a new Spark version, 
> and this new Spark version has a new RocksDB version which may use a new 
> format. But the user hits some bug and decide to rollback. But in the old 
> Spark version, the old RocksDB version cannot read the new format.
> In order to handle this case, we will write the RocksDB format version to the 
> checkpoint. When restarting from a checkpoint, we will force RocksDB to use 
> the format version stored in the checkpoint. This will ensure the user can 
> rollback their Spark version if needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org