[jira] [Commented] (SPARK-36532) Deadlock in CoarseGrainedExecutorBackend.onDisconnected
[ https://issues.apache.org/jira/browse/SPARK-36532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402071#comment-17402071 ] Apache Spark commented on SPARK-36532: -- User 'Ngone51' has created a pull request for this issue: https://github.com/apache/spark/pull/33795 > Deadlock in CoarseGrainedExecutorBackend.onDisconnected > --- > > Key: SPARK-36532 > URL: https://issues.apache.org/jira/browse/SPARK-36532 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.1.0, 3.2.0, 3.3.0 >Reporter: wuyi >Assignee: wuyi >Priority: Major > Fix For: 3.2.0 > > > The deadlock has the exactly same root cause as SPARK-14180 but just happens > in a different code path. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
[ https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang resolved SPARK-36547. Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 33791 [https://github.com/apache/spark/pull/33791] > Downgrade scala-maven-plugin to 4.3.0 > - > > Key: SPARK-36547 > URL: https://issues.apache.org/jira/browse/SPARK-36547 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.2.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Blocker > Fix For: 3.2.0 > > > This is the same issue as SPARK-34007. We need to downgrade the version of > scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 > package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35083) Support remote scheduler pool file
[ https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401980#comment-17401980 ] Apache Spark commented on SPARK-35083: -- User 'ulysses-you' has created a pull request for this issue: https://github.com/apache/spark/pull/33794 > Support remote scheduler pool file > -- > > Key: SPARK-35083 > URL: https://issues.apache.org/jira/browse/SPARK-35083 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Minor > Fix For: 3.2.0 > > > Make `spark.scheduler.allocation.file` suport remote file. When using Spark > as a server (e.g. SparkThriftServer), it's hard for user specify a local path > as the scheduler pool. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36549) Add taskStatus support multiple value to doc
[ https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36549: Assignee: (was: Apache Spark) > Add taskStatus support multiple value to doc > > > Key: SPARK-36549 > URL: https://issues.apache.org/jira/browse/SPARK-36549 > Project: Spark > Issue Type: Improvement > Components: docs >Affects Versions: 3.2.0 >Reporter: angerszhu >Priority: Major > > According to https://github.com/apache/spark/pull/31165#discussion_r692590472 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36549) Add taskStatus support multiple value to doc
[ https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401973#comment-17401973 ] Apache Spark commented on SPARK-36549: -- User 'AngersZh' has created a pull request for this issue: https://github.com/apache/spark/pull/33793 > Add taskStatus support multiple value to doc > > > Key: SPARK-36549 > URL: https://issues.apache.org/jira/browse/SPARK-36549 > Project: Spark > Issue Type: Improvement > Components: docs >Affects Versions: 3.2.0 >Reporter: angerszhu >Priority: Major > > According to https://github.com/apache/spark/pull/31165#discussion_r692590472 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36549) Add taskStatus support multiple value to doc
[ https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36549: Assignee: Apache Spark > Add taskStatus support multiple value to doc > > > Key: SPARK-36549 > URL: https://issues.apache.org/jira/browse/SPARK-36549 > Project: Spark > Issue Type: Improvement > Components: docs >Affects Versions: 3.2.0 >Reporter: angerszhu >Assignee: Apache Spark >Priority: Major > > According to https://github.com/apache/spark/pull/31165#discussion_r692590472 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36549) Add taskStatus support multiple value to doc
[ https://issues.apache.org/jira/browse/SPARK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] angerszhu updated SPARK-36549: -- Summary: Add taskStatus support multiple value to doc (was: Improve doc about [SPARK-34092][SQL] Support Stage level restful api filter task details by task status) > Add taskStatus support multiple value to doc > > > Key: SPARK-36549 > URL: https://issues.apache.org/jira/browse/SPARK-36549 > Project: Spark > Issue Type: Improvement > Components: docs >Affects Versions: 3.2.0 >Reporter: angerszhu >Priority: Major > > According to https://github.com/apache/spark/pull/31165#discussion_r692590472 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36549) Improve doc about [SPARK-34092][SQL] Support Stage level restful api filter task details by task status
angerszhu created SPARK-36549: - Summary: Improve doc about [SPARK-34092][SQL] Support Stage level restful api filter task details by task status Key: SPARK-36549 URL: https://issues.apache.org/jira/browse/SPARK-36549 Project: Spark Issue Type: Improvement Components: docs Affects Versions: 3.2.0 Reporter: angerszhu According to https://github.com/apache/spark/pull/31165#discussion_r692590472 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35933) PartitionFilters and pushFilters not applied to window functions
[ https://issues.apache.org/jira/browse/SPARK-35933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401895#comment-17401895 ] Pablo Langa Blanco commented on SPARK-35933: can we close this ticket? > PartitionFilters and pushFilters not applied to window functions > > > Key: SPARK-35933 > URL: https://issues.apache.org/jira/browse/SPARK-35933 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.8, 3.1.2 >Reporter: Shreyas Kothavade >Priority: Major > > Spark does not apply partition and pushed filters when the partition by > column and window function partition columns are not the same. For example, > in the code below, the data frame is created with a partition on "id". And I > use the partitioned data frame to calculate lag which is partitioned by > "name". In this case, the query plan shows the partitionFilters and pushed > Filters as empty. > {code:java} > spark > .createDataFrame( > Seq( > Person( > 1, > "Andy", > new Timestamp(1499955986039L), > new Timestamp(1499955982342L) > ), > Person( > 2, > "Jeff", > new Timestamp(1499955986339L), > new Timestamp(149995598L) > ) > ) > ) > .write > .partitionBy("id") > .mode(SaveMode.Append) > .parquet("spark-warehouse/people") > val dfPeople = > spark.read.parquet("spark-warehouse/people") > dfPeople > .select( > $"id", > $"name", > lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts")) > ) > .filter($"id" === 1) > .explain() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan
[ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ] Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM: This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In example below it can take a 2hr query to a 2min query even with the listFiles via manual read. My workaround example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) was (Author: gchurch): This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In example below it can take a 2hr query to a 2min query even with the listFiles via manual read. My example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) > Decouple bucket filter pruning and bucket table scan > > > Key: SPARK-32985 > URL: https://issues.apache.org/jira/browse/SPARK-32985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Cheng Su >Assignee: Cheng Su >Priority: Minor > Fix For: 3.2.0 > > > As a followup from discussion in > [https://github.com/apache/spark/pull/29804#discussion_r493100510] . > Currently in data source v1 file scan `FileSourceScanExec`, bucket filter > pruning will only take effect with bucket table scan - > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] > . However this is unnecessary, as bucket filter pruning can also happen if > we disable bucketed table scan. This help query leverage the benefit from > bucket filter pruning to save CPU/IO to not read unnecessary bucket files, > and do not bound by bucket table scan when the parallelism of tasks is a > concern. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan
[ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ] Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:02 PM: This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In my example below it takes a 2hr query on bucketed table to a 2min query even with the listFiles via manual read. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) was (Author: gchurch): This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In example below it can take a 2hr query to a 2min query even with the listFiles via manual read. My workaround example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) > Decouple bucket filter pruning and bucket table scan > > > Key: SPARK-32985 > URL: https://issues.apache.org/jira/browse/SPARK-32985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Cheng Su >Assignee: Cheng Su >Priority: Minor > Fix For: 3.2.0 > > > As a followup from discussion in > [https://github.com/apache/spark/pull/29804#discussion_r493100510] . > Currently in data source v1 file scan `FileSourceScanExec`, bucket filter > pruning will only take effect with bucket table scan - > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] > . However this is unnecessary, as bucket filter pruning can also happen if > we disable bucketed table scan. This help query leverage the benefit from > bucket filter pruning to save CPU/IO to not read unnecessary bucket files, > and do not bound by bucket table scan when the parallelism of tasks is a > concern. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32985) Decouple bucket filter pruning and bucket table scan
[ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ] Gabe Church edited comment on SPARK-32985 at 8/19/21, 11:01 PM: This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. In example below it can take a 2hr query to a 2min query even with the listFiles via manual read. My example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split(" .").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) was (Author: gchurch): This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. I've manually bypassed in my own work so that these bucketed tables are still useful, but this is absolutely huge improvement to be configurable so that we can gain from bucket-pruning. In my example it can take a 2hr query to a 2min query even with the listFiles via manual read. My example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split("\\.").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) > Decouple bucket filter pruning and bucket table scan > > > Key: SPARK-32985 > URL: https://issues.apache.org/jira/browse/SPARK-32985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Cheng Su >Assignee: Cheng Su >Priority: Minor > Fix For: 3.2.0 > > > As a followup from discussion in > [https://github.com/apache/spark/pull/29804#discussion_r493100510] . > Currently in data source v1 file scan `FileSourceScanExec`, bucket filter > pruning will only take effect with bucket table scan - > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] > . However this is unnecessary, as bucket filter pruning can also happen if > we disable bucketed table scan. This help query leverage the benefit from > bucket filter pruning to save CPU/IO to not read unnecessary bucket files, > and do not bound by bucket table scan when the parallelism of tasks is a > concern. -- This message was sent by Atlassian Jira (v8.3.4#803005) - T
[jira] [Commented] (SPARK-32985) Decouple bucket filter pruning and bucket table scan
[ https://issues.apache.org/jira/browse/SPARK-32985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401893#comment-17401893 ] Gabe Church commented on SPARK-32985: - This is a very important addition, thank you! Many cases where bucket pruning can be used to optimize lookup. Specifically, if we lack the ability to split the read tasks across multiple spark workers for date-partitioned tables that have some high cardinality columns and growing number of files per bucket, bucketing becomes useless when we are stuck with one task per executor. I've manually bypassed in my own work so that these bucketed tables are still useful, but this is absolutely huge improvement to be configurable so that we can gain from bucket-pruning. In my example it can take a 2hr query to a 2min query even with the listFiles via manual read. My example is below, and only works for individual partition reads. val table = "ex_db.ex_tbl" val target_partition = "2021-01-01" val bucket_target = "valuex" val bucket_col = "bucket_col" val partition_col = "date" import org.apache.spark.sql.functions.\{col, lit} val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target))) val sparkplan = df.queryExecution.executedPlan import org.apache.spark.sql.execution.FileSourceScanExec val scan = sparkplan.collectFirst \{ case exec: FileSourceScanExec => exec }.get import org.apache.spark.sql.execution.datasources.FileScanRDD val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD] import org.apache.spark.sql.execution.datasources.FilePartition val bucket_files = for { FilePartition(bucketId, files) <- rdd.filePartitions f <- files } yield s"$f".replaceAll("path: ", "").split(",")(0) val format = bucket_files(0).split("\\.").last val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target)) > Decouple bucket filter pruning and bucket table scan > > > Key: SPARK-32985 > URL: https://issues.apache.org/jira/browse/SPARK-32985 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Cheng Su >Assignee: Cheng Su >Priority: Minor > Fix For: 3.2.0 > > > As a followup from discussion in > [https://github.com/apache/spark/pull/29804#discussion_r493100510] . > Currently in data source v1 file scan `FileSourceScanExec`, bucket filter > pruning will only take effect with bucket table scan - > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542] > . However this is unnecessary, as bucket filter pruning can also happen if > we disable bucketed table scan. This help query leverage the benefit from > bucket filter pruning to save CPU/IO to not read unnecessary bucket files, > and do not bound by bucket table scan when the parallelism of tasks is a > concern. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35312) Introduce new Option in Kafka source to specify minimum number of records to read per trigger
[ https://issues.apache.org/jira/browse/SPARK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401871#comment-17401871 ] Apache Spark commented on SPARK-35312: -- User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/33792 > Introduce new Option in Kafka source to specify minimum number of records to > read per trigger > - > > Key: SPARK-35312 > URL: https://issues.apache.org/jira/browse/SPARK-35312 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.1 >Reporter: Satish Gopalani >Assignee: Satish Gopalani >Priority: Major > Fix For: 3.2.0 > > > Kafka source currently provides options to set the maximum number of offsets > to read per trigger. > I will like to introduce a new option to specify the minimum number of > offsets to read per trigger i.e. *minOffsetsPerTrigger*. > This new option will allow skipping trigger/batch when the number of records > available in Kafka is low. This is a very useful feature in cases where we > have a sudden burst of data at certain intervals in a day and data volume is > low for the rest of the day. Tunning such jobs is difficult as decreasing > trigger processing time increasing the number of batches and hence cluster > resource usage and adds to small file issues. Increasing trigger processing > time adds consumer lag. This will save cluster resources and also help solve > small file issues as it is running lesser batches. > Along with this, I would like to introduce '*maxTriggerDelay*' option which > will help to avoid cases of infinite delay in scheduling trigger and the > trigger will happen irrespective of records available if the maxTriggerDelay > time exceeds the last trigger. It would be an optional parameter with a > default value of 15 mins. _This option will be only applicable if > minOffsetsPerTrigger is set._ > *minOffsetsPerTrigger* option would be optional of course, but once specified > it would take precedence over *maxOffestsPerTrigger* which will be honored > only after *minOffsetsPerTrigger* is satisfied. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35312) Introduce new Option in Kafka source to specify minimum number of records to read per trigger
[ https://issues.apache.org/jira/browse/SPARK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401867#comment-17401867 ] Apache Spark commented on SPARK-35312: -- User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/33792 > Introduce new Option in Kafka source to specify minimum number of records to > read per trigger > - > > Key: SPARK-35312 > URL: https://issues.apache.org/jira/browse/SPARK-35312 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.1 >Reporter: Satish Gopalani >Assignee: Satish Gopalani >Priority: Major > Fix For: 3.2.0 > > > Kafka source currently provides options to set the maximum number of offsets > to read per trigger. > I will like to introduce a new option to specify the minimum number of > offsets to read per trigger i.e. *minOffsetsPerTrigger*. > This new option will allow skipping trigger/batch when the number of records > available in Kafka is low. This is a very useful feature in cases where we > have a sudden burst of data at certain intervals in a day and data volume is > low for the rest of the day. Tunning such jobs is difficult as decreasing > trigger processing time increasing the number of batches and hence cluster > resource usage and adds to small file issues. Increasing trigger processing > time adds consumer lag. This will save cluster resources and also help solve > small file issues as it is running lesser batches. > Along with this, I would like to introduce '*maxTriggerDelay*' option which > will help to avoid cases of infinite delay in scheduling trigger and the > trigger will happen irrespective of records available if the maxTriggerDelay > time exceeds the last trigger. It would be an optional parameter with a > default value of 15 mins. _This option will be only applicable if > minOffsetsPerTrigger is set._ > *minOffsetsPerTrigger* option would be optional of course, but once specified > it would take precedence over *maxOffestsPerTrigger* which will be honored > only after *minOffsetsPerTrigger* is satisfied. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36548) Throwing NoClassDefFoundError for Logging$class
[ https://issues.apache.org/jira/browse/SPARK-36548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sadagopan kalyanaraman updated SPARK-36548: --- Description: I'm getting NoClassDefFoundError. is it removed in 3.1.1? https://spark.apache.org/docs/3.1.1/api/java/org/apache/spark/internal/Logging.html *https://spark.apache.org/docs/2.4.7/api/java/org/apache/spark/internal/Logging.html* Exception in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiatedException in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated at java.base/java.util.ServiceLoader.fail(Unknown Source) at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) at java.base/java.util.ServiceLoader$ProviderImpl.get(Unknown Source) at java.base/java.util.ServiceLoader$3.next(Unknown Source) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:347) at scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:209) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195) at com.xyz.xyz.xyz.AbstractCdcMessageToSqJob.runJob(AbstractCdcMessageToSqJob.java:94) at com.xyz.xyz.xyz.CdcMessageToSqlBillerJob.main(CdcMessageToSqlBillerJob.java:30) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class at com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala:34) at com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala) at com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:43) at com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:50) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) ... 33 moreCaused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging$class at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 41 more was: I'm getting NoClassDefFoundError. is it removed in 3.1.1? Exception in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiatedException in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.sp
[jira] [Created] (SPARK-36548) Throwing NoClassDefFoundError for Logging$class
sadagopan kalyanaraman created SPARK-36548: -- Summary: Throwing NoClassDefFoundError for Logging$class Key: SPARK-36548 URL: https://issues.apache.org/jira/browse/SPARK-36548 Project: Spark Issue Type: Bug Components: Kubernetes, Spark Core Affects Versions: 3.1.1 Reporter: sadagopan kalyanaraman I'm getting NoClassDefFoundError. is it removed in 3.1.1? Exception in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiatedException in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated at java.base/java.util.ServiceLoader.fail(Unknown Source) at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(Unknown Source) at java.base/java.util.ServiceLoader$ProviderImpl.get(Unknown Source) at java.base/java.util.ServiceLoader$3.next(Unknown Source) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:347) at scala.collection.TraversableLike.filter$(TraversableLike.scala:347) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:659) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:209) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195) at com.xyz.xyz.xyz.AbstractCdcMessageToSqJob.runJob(AbstractCdcMessageToSqJob.java:94) at com.xyz.xyz.xyz.CdcMessageToSqlBillerJob.main(CdcMessageToSqlBillerJob.java:30) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class at com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala:34) at com.google.cloud.spark.bigquery.BigQueryUtilScala$.(BigQueryUtil.scala) at com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:43) at com.google.cloud.spark.bigquery.BigQueryRelationProvider.(BigQueryRelationProvider.scala:50) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) ... 33 moreCaused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging$class at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 41 more -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
[ https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36547: Assignee: Gengliang Wang (was: Apache Spark) > Downgrade scala-maven-plugin to 4.3.0 > - > > Key: SPARK-36547 > URL: https://issues.apache.org/jira/browse/SPARK-36547 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.2.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Blocker > > This is the same issue as SPARK-34007. We need to downgrade the version of > scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 > package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
[ https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401809#comment-17401809 ] Apache Spark commented on SPARK-36547: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/33791 > Downgrade scala-maven-plugin to 4.3.0 > - > > Key: SPARK-36547 > URL: https://issues.apache.org/jira/browse/SPARK-36547 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.2.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Blocker > > This is the same issue as SPARK-34007. We need to downgrade the version of > scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 > package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
[ https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401808#comment-17401808 ] Apache Spark commented on SPARK-36547: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/33791 > Downgrade scala-maven-plugin to 4.3.0 > - > > Key: SPARK-36547 > URL: https://issues.apache.org/jira/browse/SPARK-36547 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.2.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Blocker > > This is the same issue as SPARK-34007. We need to downgrade the version of > scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 > package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
[ https://issues.apache.org/jira/browse/SPARK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36547: Assignee: Apache Spark (was: Gengliang Wang) > Downgrade scala-maven-plugin to 4.3.0 > - > > Key: SPARK-36547 > URL: https://issues.apache.org/jira/browse/SPARK-36547 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.2.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Blocker > > This is the same issue as SPARK-34007. We need to downgrade the version of > scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 > package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36547) Downgrade scala-maven-plugin to 4.3.0
Gengliang Wang created SPARK-36547: -- Summary: Downgrade scala-maven-plugin to 4.3.0 Key: SPARK-36547 URL: https://issues.apache.org/jira/browse/SPARK-36547 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 3.2.0 Reporter: Gengliang Wang Assignee: Gengliang Wang This is the same issue as SPARK-34007. We need to downgrade the version of scala-maven-plugin to 4.3.0, in order to avoid error on building Hadoop 2.7 package -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34305) Unify v1 and v2 ALTER TABLE .. SET SERDE tests
[ https://issues.apache.org/jira/browse/SPARK-34305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-34305: --- Fix Version/s: (was: 3.2.0) 3.3.0 > Unify v1 and v2 ALTER TABLE .. SET SERDE tests > -- > > Key: SPARK-34305 > URL: https://issues.apache.org/jira/browse/SPARK-34305 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.3.0 > > > Extract ALTER TABLE .. SET SERDE tests to the common place to run them for V1 > and v2 datasources. Some tests can be places to V1 and V2 specific test > suites. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34332) Unify v1 and v2 ALTER TABLE .. SET LOCATION tests
[ https://issues.apache.org/jira/browse/SPARK-34332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-34332: --- Fix Version/s: (was: 3.2.0) 3.3.0 > Unify v1 and v2 ALTER TABLE .. SET LOCATION tests > - > > Key: SPARK-34332 > URL: https://issues.apache.org/jira/browse/SPARK-34332 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.3.0 > > > Extract ALTER TABLE .. SET LOCATION tests to the common place to run them for > V1 and v2 datasources. Some tests can be places to V1 and V2 specific test > suites. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36082) when the right side is small enough to use SingleColumn Null Aware Anti Join
[ https://issues.apache.org/jira/browse/SPARK-36082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-36082: --- Fix Version/s: (was: 3.2.0) 3.3.0 > when the right side is small enough to use SingleColumn Null Aware Anti Join > > > Key: SPARK-36082 > URL: https://issues.apache.org/jira/browse/SPARK-36082 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0, 3.1.3 >Reporter: mcdull_zhang >Priority: Minor > Fix For: 3.3.0 > > > NULL-aware ANTI join (https://issues.apache.org/jira/browse/SPARK-32290) will > build right side into a HashMap. > code in SparkStrategy: > > {code:java} > case j @ ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) => > Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, BuildRight, > None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = > true)){code} > we should add the conditions and use this optimization when the size of the > right side is small enough. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34884) Improve dynamic partition pruning evaluation
[ https://issues.apache.org/jira/browse/SPARK-34884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang resolved SPARK-34884. Resolution: Fixed > Improve dynamic partition pruning evaluation > > > Key: SPARK-34884 > URL: https://issues.apache.org/jira/browse/SPARK-34884 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 3.2.0 > > > Fast fail if filtering side can not build broadcast by size. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35083) Support remote scheduler pool file
[ https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401686#comment-17401686 ] Apache Spark commented on SPARK-35083: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/33789 > Support remote scheduler pool file > -- > > Key: SPARK-35083 > URL: https://issues.apache.org/jira/browse/SPARK-35083 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Minor > Fix For: 3.2.0 > > > Make `spark.scheduler.allocation.file` suport remote file. When using Spark > as a server (e.g. SparkThriftServer), it's hard for user specify a local path > as the scheduler pool. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35083) Support remote scheduler pool file
[ https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401685#comment-17401685 ] Apache Spark commented on SPARK-35083: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/33789 > Support remote scheduler pool file > -- > > Key: SPARK-35083 > URL: https://issues.apache.org/jira/browse/SPARK-35083 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Minor > Fix For: 3.2.0 > > > Make `spark.scheduler.allocation.file` suport remote file. When using Spark > as a server (e.g. SparkThriftServer), it's hard for user specify a local path > as the scheduler pool. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns
[ https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal Dhavale updated SPARK-36546: --- Description: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- author: string (nullable = true) |||-- pages: integer (nullable = true) {code} Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column" {code:java} val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- author: string (nullable = true) |||-- pages: integer (nullable = true) |||-- new_column: string (nullable = true){code} Step3: Merge arrayStructDf1 and arrayStructDf2 using unionByName We see the error org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false{code} unionByName should fill the missing data with null like it does column with struct type was: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- author: string (nullable = true) |||-- pages: integer (nullable = true) |||-- new_column: string (nullable = true) {code} Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column" {code:java} val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",
[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns
[ https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal Dhavale updated SPARK-36546: --- Description: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) ||-- element: struct (containsNull = true) |||-- name: string (nullable = true) |||-- author: string (nullable = true) |||-- pages: integer (nullable = true) |||-- new_column: string (nullable = true) {code} Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column" {code:java} val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema(){code} We see the below error when we try to use unionByName arrayStructDf1 and arrayStructDf2 {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false{code} unionByName should fill the missing data with null like it does column with struct type was: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() {code} Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column" {code:java} val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema(){code} We see the below error when we try to use unionByName arrayStructDf1 and arrayStructDf2 {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksInt
[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
[ https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] handong updated SPARK-36545: Description: i have a spark streaming application with kafka . Here are the parameters: kafka partition = 500 batch time = 60 --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=2500 input size= 500 * 120 * 2500 = 75,000,000 however input size become 16000 after some batch who can tell me reason !kafka.png! was: !kafka.png! > sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition > --- > > Key: SPARK-36545 > URL: https://issues.apache.org/jira/browse/SPARK-36545 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Submit, YARN >Affects Versions: 2.4.5 >Reporter: handong >Priority: Major > Attachments: kafka.png > > > > i have a spark streaming application with kafka . > Here are the parameters: > kafka partition = 500 > batch time = 60 > --conf spark.streaming.backpressure.enabled=true > --conf spark.streaming.kafka.maxRatePerPartition=2500 > input size= 500 * 120 * 2500 = 75,000,000 > however input size become 16000 after some batch > who can tell me reason > !kafka.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
[ https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] handong updated SPARK-36545: Description: !kafka.png! was: !kafka.png! i have a spark streaming application with kafka . Here are the parameters: kafka partition = 500 batch time = 60 --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=2500 input size= 500 * 120 * 2500 = 75,000,000 however input size become 16000 after some batch > sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition > --- > > Key: SPARK-36545 > URL: https://issues.apache.org/jira/browse/SPARK-36545 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Submit, YARN >Affects Versions: 2.4.5 >Reporter: handong >Priority: Major > Attachments: kafka.png > > > !kafka.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
[ https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] handong updated SPARK-36545: Description: !kafka.png! i have a spark streaming application with kafka . Here are the parameters: kafka partition = 500 batch time = 60 --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=2500 input size= 500 * 120 * 2500 = 75,000,000 however input size become 16000 after some batch was: i have a spark streaming application with kafka . Here are the parameters: kafka partition = 500 batch time = 60 --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=2500 input size= 500 * 120 * 2500 = 75,000,000 however input size become 16000 after some batch > sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition > --- > > Key: SPARK-36545 > URL: https://issues.apache.org/jira/browse/SPARK-36545 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Submit, YARN >Affects Versions: 2.4.5 >Reporter: handong >Priority: Major > Attachments: kafka.png > > > !kafka.png! i have a spark streaming application with kafka . > Here are the parameters: > kafka partition = 500 > batch time = 60 > --conf spark.streaming.backpressure.enabled=true > --conf spark.streaming.kafka.maxRatePerPartition=2500 > input size= 500 * 120 * 2500 = 75,000,000 > however input size become 16000 after some batch > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
[ https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] handong updated SPARK-36545: Attachment: kafka.png > sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition > --- > > Key: SPARK-36545 > URL: https://issues.apache.org/jira/browse/SPARK-36545 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Submit, YARN >Affects Versions: 2.4.5 >Reporter: handong >Priority: Major > Attachments: kafka.png > > > i have a spark streaming application with kafka . > Here are the parameters: > kafka partition = 500 > batch time = 60 > --conf spark.streaming.backpressure.enabled=true > --conf spark.streaming.kafka.maxRatePerPartition=2500 > input size= 500 * 120 * 2500 = 75,000,000 > however input size become 16000 after some batch > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns
[ https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal Dhavale updated SPARK-36546: --- Description: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() {code} Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column" {code:java} val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema(){code} We see the below error when we try to use unionByName arrayStructDf1 and arrayStructDf2 {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false{code} unionByName should fill the missing data with null like it does column with struct type was: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) | | |-- new_column: string (nullable = true) {code} Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type array of a struct but struct contains an extra field called "new_column" val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema() scala> arrayStructDf2.printSchema() root |– name: string (nullable = true)| |– booksIntersted: array (nullable = true)| | |– element: struct (containsNull = true)| | | |– name: string (nullable = true)| | | |– author: string (nullable = true)| | | |– pages: integer (nullable = true)| | | |– new_column: string (nullable = true) Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName| {code:java} scala> arrayStructDf1.unionByName(arraySt
[jira] [Updated] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
[ https://issues.apache.org/jira/browse/SPARK-36545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] handong updated SPARK-36545: Component/s: YARN Spark Submit Description: i have a spark streaming application with kafka . Here are the parameters: kafka partition = 500 batch time = 60 --conf spark.streaming.backpressure.enabled=true --conf spark.streaming.kafka.maxRatePerPartition=2500 input size= 500 * 120 * 2500 = 75,000,000 however input size become 16000 after some batch > sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition > --- > > Key: SPARK-36545 > URL: https://issues.apache.org/jira/browse/SPARK-36545 > Project: Spark > Issue Type: Bug > Components: DStreams, Spark Submit, YARN >Affects Versions: 2.4.5 >Reporter: handong >Priority: Major > > i have a spark streaming application with kafka . > Here are the parameters: > kafka partition = 500 > batch time = 60 > --conf spark.streaming.backpressure.enabled=true > --conf spark.streaming.kafka.maxRatePerPartition=2500 > input size= 500 * 120 * 2500 = 75,000,000 > however input size become 16000 after some batch > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns
[ https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vishal Dhavale updated SPARK-36546: --- Description: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct {code:java} val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) | | |-- new_column: string (nullable = true) {code} Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type array of a struct but struct contains an extra field called "new_column" val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema() scala> arrayStructDf2.printSchema() root |– name: string (nullable = true)| |– booksIntersted: array (nullable = true)| | |– element: struct (containsNull = true)| | | |– name: string (nullable = true)| | | |– author: string (nullable = true)| | | |– pages: integer (nullable = true)| | | |– new_column: string (nullable = true) Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName| {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false{code} unionByName should fill the missing data with null like it does column with struct type https://issues.apache.org/jira/browse/SPARK-32376 was: Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type array of a struct but struct contains an extra field called "new_column" val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType)
[jira] [Created] (SPARK-36546) Make unionByName null-filling behavior work with array of struct columns
Vishal Dhavale created SPARK-36546: -- Summary: Make unionByName null-filling behavior work with array of struct columns Key: SPARK-36546 URL: https://issues.apache.org/jira/browse/SPARK-36546 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.1.1 Reporter: Vishal Dhavale Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns. unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema Below is the example. Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500 val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) Step 2: Another dataframe arrayStructDf2 with columnbooksIntersted of type array of a struct but struct contains an extra field called "new_column" val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data" val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) | | |-- new_column: string (nullable = true) Step 3: Try to merge arrayStructDf1 and arrayStructDf2 using unionByName {code:java} scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array> <> array> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false{code} unionByName should fill the missing data with null like it does column with struct type https://issues.apache.org/jira/browse/SPARK-32376 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36545) sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition
handong created SPARK-36545: --- Summary: sparkstreaming input rate exceed spark.streaming.kafka.maxRatePerPartition Key: SPARK-36545 URL: https://issues.apache.org/jira/browse/SPARK-36545 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.4.5 Reporter: handong -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36428) the 'seconds' parameter of 'make_timestamp' should accept integer type
[ https://issues.apache.org/jira/browse/SPARK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401598#comment-17401598 ] Apache Spark commented on SPARK-36428: -- User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/33787 > the 'seconds' parameter of 'make_timestamp' should accept integer type > -- > > Key: SPARK-36428 > URL: https://issues.apache.org/jira/browse/SPARK-36428 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > Fix For: 3.2.0 > > > With ANSI mode, {{SELECT make_timestamp(1, 1, 1, 1, 1, 1)}} fails, because > the 'seconds' parameter needs to be of type DECIMAL(8,6), and INT can't be > implicitly casted to DECIMAL(8,6) under ANSI mode. > We should update the function {{make_timestamp}} to allow integer type > 'seconds' parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36428) the 'seconds' parameter of 'make_timestamp' should accept integer type
[ https://issues.apache.org/jira/browse/SPARK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401596#comment-17401596 ] Apache Spark commented on SPARK-36428: -- User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/33787 > the 'seconds' parameter of 'make_timestamp' should accept integer type > -- > > Key: SPARK-36428 > URL: https://issues.apache.org/jira/browse/SPARK-36428 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > Fix For: 3.2.0 > > > With ANSI mode, {{SELECT make_timestamp(1, 1, 1, 1, 1, 1)}} fails, because > the 'seconds' parameter needs to be of type DECIMAL(8,6), and INT can't be > implicitly casted to DECIMAL(8,6) under ANSI mode. > We should update the function {{make_timestamp}} to allow integer type > 'seconds' parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36538) Environment variables part in config doc isn't properly documented.
[ https://issues.apache.org/jira/browse/SPARK-36538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuto Akutsu resolved SPARK-36538. - Resolution: Invalid > Environment variables part in config doc isn't properly documented. > --- > > Key: SPARK-36538 > URL: https://issues.apache.org/jira/browse/SPARK-36538 > Project: Spark > Issue Type: Documentation > Components: Documentation >Affects Versions: 3.1.2 >Reporter: Yuto Akutsu >Priority: Major > > It says environment variables are not reflected through spark-env.sh in YARN > cluster mode but I believe they are. I think this part of the document should > be removed. > [https://spark.apache.org/docs/latest/configuration.html#environment-variables] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36541) Images used in pyspark documentation still use Koalas
[ https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36541: Assignee: Apache Spark > Images used in pyspark documentation still use Koalas > - > > Key: SPARK-36541 > URL: https://issues.apache.org/jira/browse/SPARK-36541 > Project: Spark > Issue Type: Documentation > Components: docs, PySpark >Affects Versions: 3.2.0 >Reporter: Leona Yoda >Assignee: Apache Spark >Priority: Minor > Attachments: images_replaced.key > > > Images in [Transform and apply a > function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst] > documentation still uses the word Koalas, althogh the word was replaced to > panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] . > I think we have to match the word on that images > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36541) Images used in pyspark documentation still use Koalas
[ https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-36541: Assignee: (was: Apache Spark) > Images used in pyspark documentation still use Koalas > - > > Key: SPARK-36541 > URL: https://issues.apache.org/jira/browse/SPARK-36541 > Project: Spark > Issue Type: Documentation > Components: docs, PySpark >Affects Versions: 3.2.0 >Reporter: Leona Yoda >Priority: Minor > Attachments: images_replaced.key > > > Images in [Transform and apply a > function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst] > documentation still uses the word Koalas, althogh the word was replaced to > panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] . > I think we have to match the word on that images > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36541) Images used in pyspark documentation still use Koalas
[ https://issues.apache.org/jira/browse/SPARK-36541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401567#comment-17401567 ] Apache Spark commented on SPARK-36541: -- User 'yoda-mon' has created a pull request for this issue: https://github.com/apache/spark/pull/33786 > Images used in pyspark documentation still use Koalas > - > > Key: SPARK-36541 > URL: https://issues.apache.org/jira/browse/SPARK-36541 > Project: Spark > Issue Type: Documentation > Components: docs, PySpark >Affects Versions: 3.2.0 >Reporter: Leona Yoda >Priority: Minor > Attachments: images_replaced.key > > > Images in [Transform and apply a > function|https://github.com/apache/spark/blob/master/python/docs/source/user_guide/pandas_on_spark/transform_apply.rst] > documentation still uses the word Koalas, althogh the word was replaced to > panas-on-Spark by this [PR|https://github.com/apache/spark/pull/32835] . > I think we have to match the word on that images > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning
[ https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-36444: Affects Version/s: 3.0.3 3.1.2 > Remove OptimizeSubqueries from batch of PartitionPruning > > > Key: SPARK-36444 > URL: https://issues.apache.org/jira/browse/SPARK-36444 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.3, 3.1.2, 3.2.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 3.2.0 > > > To support this case: > {code:scala} > sql( > """ > |SELECT date_id, product_id FROM fact_sk f > |JOIN (select store_id + 3 as new_store_id from dim_store where > country = 'US') s > |ON f.store_id = s.new_store_id > """.stripMargin) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning
[ https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-36444: Affects Version/s: (was: 3.3.0) 3.2.0 > Remove OptimizeSubqueries from batch of PartitionPruning > > > Key: SPARK-36444 > URL: https://issues.apache.org/jira/browse/SPARK-36444 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 3.2.0 > > > To support this case: > {code:scala} > sql( > """ > |SELECT date_id, product_id FROM fact_sk f > |JOIN (select store_id + 3 as new_store_id from dim_store where > country = 'US') s > |ON f.store_id = s.new_store_id > """.stripMargin) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning
[ https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang resolved SPARK-36444. - Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 33664 [https://github.com/apache/spark/pull/33664] > Remove OptimizeSubqueries from batch of PartitionPruning > > > Key: SPARK-36444 > URL: https://issues.apache.org/jira/browse/SPARK-36444 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 3.2.0 > > > To support this case: > {code:scala} > sql( > """ > |SELECT date_id, product_id FROM fact_sk f > |JOIN (select store_id + 3 as new_store_id from dim_store where > country = 'US') s > |ON f.store_id = s.new_store_id > """.stripMargin) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-36444) Remove OptimizeSubqueries from batch of PartitionPruning
[ https://issues.apache.org/jira/browse/SPARK-36444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang reassigned SPARK-36444: --- Assignee: Yuming Wang > Remove OptimizeSubqueries from batch of PartitionPruning > > > Key: SPARK-36444 > URL: https://issues.apache.org/jira/browse/SPARK-36444 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > > To support this case: > {code:scala} > sql( > """ > |SELECT date_id, product_id FROM fact_sk f > |JOIN (select store_id + 3 as new_store_id from dim_store where > country = 'US') s > |ON f.store_id = s.new_store_id > """.stripMargin) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark
[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514 ] Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:56 AM: -- [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) is needed for lower version of hadoop. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe it's better to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark, It's better to use hadoop-3.0.1 or later version of hadoop. was (Author: saurabhc100): [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe it's better to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark, It's better to use hadoop-3.0.1 or later version of hadoop. > Handling Node Decommissioning for Yarn cluster manger in Spark > -- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 3.1.0 >Reporter: Saurabh Chawla >Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled
[jira] [Commented] (SPARK-35083) Support remote scheduler pool file
[ https://issues.apache.org/jira/browse/SPARK-35083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401517#comment-17401517 ] Apache Spark commented on SPARK-35083: -- User 'Ngone51' has created a pull request for this issue: https://github.com/apache/spark/pull/33785 > Support remote scheduler pool file > -- > > Key: SPARK-35083 > URL: https://issues.apache.org/jira/browse/SPARK-35083 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Minor > Fix For: 3.2.0 > > > Make `spark.scheduler.allocation.file` suport remote file. When using Spark > as a server (e.g. SparkThriftServer), it's hard for user specify a local path > as the scheduler pool. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark
[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514 ] Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:46 AM: -- [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe it's better to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark, It's better to use hadoop-3.0.1 or later version of hadoop. was (Author: saurabhc100): [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe its good to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark. It's better to use hadoop-3.0.1 or later version of hadoop. > Handling Node Decommissioning for Yarn cluster manger in Spark > -- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 3.1.0 >Reporter: Saurabh Chawla >Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled multiple tim
[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark
[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514 ] Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:35 AM: -- [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe its good to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark. It's better to use hadoop-3.0.1 or later version of hadoop. was (Author: saurabhc100): [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe its good to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark. It's better to use hadoop-3.0.1 or later version of hadoop. {{}} > Handling Node Decommissioning for Yarn cluster manger in Spark > -- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 3.1.0 >Reporter: Saurabh Chawla >Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled multip
[jira] [Commented] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark
[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514 ] Saurabh Chawla commented on SPARK-30873: [~kings129] - The backport of the (https://issues.apache.org/jira/browse/YARN-6483. and https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. This change is needed to get the support the framework (getDecommissioningTimeout, setDecommissioningTimeout) that was added in the NodeReport.java . And in this Spark PR we have made use of NodeReport methods to get the DecommissioningTimeout in YarnAllocator.scala. This change is available from hadoop-3.0.1 Although this DECOMMISSIONING node state support is there from hadoop-2.8 and hadoop-2.9, and if we received the Decommissioning State info from RM, we can make use of some conf related to decommission timeout for various cloud provider, For example -> like Aws spot node is terminated after 120 sec after the decommissioning state so we can give 110 sec as decommissioning timeout. Same is for GCP preemptible VMs. I believe its good to get DecommissioningTimeout from RM rather than doing it here in Spark side through some confs. To get the correct behaviour of graceful decommissioning working in spark. It's better to use hadoop-3.0.1 or later version of hadoop. {{}} > Handling Node Decommissioning for Yarn cluster manger in Spark > -- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN >Affects Versions: 3.1.0 >Reporter: Saurabh Chawla >Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled multiple times to recompute > missing shuffles from all nodes, and prevent jobs from being stuck for hours > failing and recomputing. > 4) Prevent the stage to abort due to the fetchfailed exception in case of > decommissioning of node. In Spark there is number of consecutive stage > attempts allowed before a stage is aborted.This is controlled by the config > spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due > decommissioning of nodes towards stage failure improves the reliability of > the system. > Main components of change > 1) Get the ClusterInfo update from the Resource Manager -> Application Master > -> Spark Driver. > 2) DecommissionTracker, resides inside driver, tracks all the decommissioned > nodes and take necessary action and state transition. > 3) Based on the decommission node list add hooks at code to achieve > a) No new task on executor > b) Remove shuffle data mapping info for the node to be decommissioned from > the mapOutputTracker > c) Do not count fetchFailure from decommissioned towards stage failure > On the receiving info that node is to be decommissioned, the below action > needs to be performed by DecommissionTracker on driver: > * Add the entry of Nodes in DecommissionTracke
[jira] [Resolved] (SPARK-36519) Store the RocksDB format in the checkpoint for a streaming query
[ https://issues.apache.org/jira/browse/SPARK-36519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] L. C. Hsieh resolved SPARK-36519. - Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 33749 [https://github.com/apache/spark/pull/33749] > Store the RocksDB format in the checkpoint for a streaming query > > > Key: SPARK-36519 > URL: https://issues.apache.org/jira/browse/SPARK-36519 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Major > Fix For: 3.2.0 > > > RocksDB provides backward compatibility but it doesn't always provide forward > compatibility. It's better to store the RocksDB format version in the > checkpoint so that it would give us more information to provide the rollback > guarantee when we upgrade the RocksDB version that may introduce incompatible > change in a new Spark version. > A typical case is when a user upgrades their query to a new Spark version, > and this new Spark version has a new RocksDB version which may use a new > format. But the user hits some bug and decide to rollback. But in the old > Spark version, the old RocksDB version cannot read the new format. > In order to handle this case, we will write the RocksDB format version to the > checkpoint. When restarting from a checkpoint, we will force RocksDB to use > the format version stored in the checkpoint. This will ensure the user can > rollback their Spark version if needed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org