[jira] [Updated] (SPARK-47148) Avoid to materialize AQE ExchangeQueryStageExec on the cancellation
[ https://issues.apache.org/jira/browse/SPARK-47148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-47148: --- Description: AQE can materialize both *ShuffleQueryStage* and *BroadcastQueryStage* on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is already non-materialized (a.k.a *ShuffleQueryStage.shuffleFuture* or *{{BroadcastQueryStage.broadcastFuture}}* is not initialized yet), it should just be skipped without materializing it. Please find sample use-case: *1- Stage Materialization Steps:* When stage materialization is failed: {code:java} 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet{code} *2- Stage Cancellation Steps:* {code:java} 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default by AQE because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.{code} was: AQE can materialize *ShuffleQueryStage* on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job. Under normal circumstances, if the stage is already non-materialized (a.k.a ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be skipped without materializing it. Please find sample use-case: *1- Stage Materialization Steps:* When stage materialization is failed: {code:java} 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet{code} *2- Stage Cancellation Steps:* {code:java} 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default by AQE because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.{code} > Avoid to materialize AQE ExchangeQueryStageExec on the cancellation > --- > > Key: SPARK-47148 > URL: https://issues.apache.org/jira/browse/SPARK-47148 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > > AQE can materialize both *ShuffleQueryStage* and *BroadcastQueryStage* on the > cancellation. This causes unnecessary stage materialization by submitting > Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is > already non-materialized (a.k.a *ShuffleQueryStage.shuffleFuture* or > *{{BroadcastQueryStage.broadcastFuture}}* is not initialized yet), it should > just be skipped without materializing it. > Please find sample use-case: > *1- Stage Materialization Steps:* > When stage materialization is failed: > {code:java} > 1.1- ShuffleQueryStage1 - is materialized successfully, > 1.2- ShuffleQueryStage2 - materialization is failed, > 1.3- ShuffleQueryStage3 - Not materialized yet so > ShuffleQueryStage3.shuffleFuture is not initialized yet{code} > *2- Stage Cancellation Steps:* > {code:java} > 2.1- ShuffleQueryStage1 - is canceled due to already materialized, > 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as > default by AQE because it could not be materialized, > 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet > but currently, it is also tried to cancel and this stage requires to be > materialized first.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47148) Avoid to materialize AQE ExchangeQueryStageExec on the cancellation
[ https://issues.apache.org/jira/browse/SPARK-47148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-47148: --- Summary: Avoid to materialize AQE ExchangeQueryStageExec on the cancellation (was: Avoid to materialize AQE QueryStages on the cancellation) > Avoid to materialize AQE ExchangeQueryStageExec on the cancellation > --- > > Key: SPARK-47148 > URL: https://issues.apache.org/jira/browse/SPARK-47148 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > > AQE can materialize *ShuffleQueryStage* on the cancellation. This causes > unnecessary stage materialization by submitting Shuffle Job. Under normal > circumstances, if the stage is already non-materialized (a.k.a > ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be > skipped without materializing it. > Please find sample use-case: > *1- Stage Materialization Steps:* > When stage materialization is failed: > {code:java} > 1.1- ShuffleQueryStage1 - is materialized successfully, > 1.2- ShuffleQueryStage2 - materialization is failed, > 1.3- ShuffleQueryStage3 - Not materialized yet so > ShuffleQueryStage3.shuffleFuture is not initialized yet{code} > *2- Stage Cancellation Steps:* > {code:java} > 2.1- ShuffleQueryStage1 - is canceled due to already materialized, > 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as > default by AQE because it could not be materialized, > 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet > but currently, it is also tried to cancel and this stage requires to be > materialized first.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47148) Avoid to materialize AQE ShuffleQueryStage on the cancellation
[ https://issues.apache.org/jira/browse/SPARK-47148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-47148: --- Description: AQE can materialize *ShuffleQueryStage* on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job. Under normal circumstances, if the stage is already non-materialized (a.k.a ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be skipped without materializing it. Please find sample use-case: *1- Stage Materialization Steps:* When stage materialization is failed: {code:java} 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet{code} *2- Stage Cancellation Steps:* {code:java} 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default by AQE because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.{code} was: AQE can materialize *ShuffleQueryStage* on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job. Under normal circumstances, if the stage is already non-materialized (a.k.a ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be skipped without materializing it. Please find sample use-case: *1- Stage Materialization Steps:* When stage materialization is failed: {code:java} 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet{code} *2- Stage Cancellation Steps:* {code:java} 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.{code} > Avoid to materialize AQE ShuffleQueryStage on the cancellation > -- > > Key: SPARK-47148 > URL: https://issues.apache.org/jira/browse/SPARK-47148 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > > AQE can materialize *ShuffleQueryStage* on the cancellation. This causes > unnecessary stage materialization by submitting Shuffle Job. Under normal > circumstances, if the stage is already non-materialized (a.k.a > ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be > skipped without materializing it. > Please find sample use-case: > *1- Stage Materialization Steps:* > When stage materialization is failed: > {code:java} > 1.1- ShuffleQueryStage1 - is materialized successfully, > 1.2- ShuffleQueryStage2 - materialization is failed, > 1.3- ShuffleQueryStage3 - Not materialized yet so > ShuffleQueryStage3.shuffleFuture is not initialized yet{code} > *2- Stage Cancellation Steps:* > {code:java} > 2.1- ShuffleQueryStage1 - is canceled due to already materialized, > 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as > default by AQE because it could not be materialized, > 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet > but currently, it is also tried to cancel and this stage requires to be > materialized first.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46639) Add WindowExec SQLMetrics
[ https://issues.apache.org/jira/browse/SPARK-46639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-46639: --- Description: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} As an example use-case, WindowExec spilling behavior depends on multiple factors and it can sometime cause {{SparkOutOfMemoryError}} instead of spilling to disk so it is hard to track without SQL Metrics such as: *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) per task (a.k.a child RDD partition) *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds spark.sql.windowExec.buffer.in.memory.threshold=4096, ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as spillableArray by moving its all buffered rows into UnsafeExternalSorter and ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a UnsafeInMemorySorter). *3-* UnsafeExternalSorter is being created per window partition. When UnsafeExternalSorter' buffer size exceeds spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) content. In this case, UnsafeExternalSorter will continue to buffer next records until exceeding spark.sql.windowExec.buffer.spill.threshold. *New WindowExec SQLMetrics Sample Screenshot:* !WindowExec SQLMetrics.png|width=257,height=152! was: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} As an example use-case, WindowExec spilling behavior depends on multiple factors and it can sometime cause {{SparkOutOfMemoryError}} instead of spilling to disk so it is hard to track without SQL Metrics such as: *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) per task (a.k.a child RDD partition) *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds spark.sql.windowExec.buffer.in.memory.threshold=4096, ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as spillableArray by moving its all buffered rows into UnsafeExternalSorter and ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a UnsafeInMemorySorter). *3-* UnsafeExternalSorter is being created per window partition. When UnsafeExternalSorter' buffer size exceeds spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) content. In this case, UnsafeExternalSorter will continue to buffer next records until exceeding spark.sql.windowExec.buffer.spill.threshold. Sample UI Screenshot: !WindowExec SQLMetrics.png|width=257,height=152! > Add WindowExec SQLMetrics > - > > Key: SPARK-46639 > URL: https://issues.apache.org/jira/browse/SPARK-46639 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > Attachments: WindowExec SQLMetrics.png > > > Currently, WindowExec Physical Operator has only spillSize SQLMetric. This > jira aims to add following SQLMetrics to provide more information from > WindowExec usage during query execution: > {code:java} > numOfOutputRows: Number of total output rows. > numOfPartitions: Number of processed input partitions. > numOfWindowPartitions: Number of generated window partitions. > spilledRows: Number of total spilled rows. > spillSizeOnDisk: Total spilled data size on disk.{code} > As an example use-case, WindowExec spilling behavior depends on multiple > factors and it can sometime cause {{SparkOutOfMemoryError}} instead of > spilling to disk so it is hard to track without SQL Metrics such as: > *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal > ArrayBuffer) per task (a.k.a child RDD partition) > *2-* When ExternalAppendOnlyUnsafeRowArray size
[jira] [Updated] (SPARK-46639) Add WindowExec SQLMetrics
[ https://issues.apache.org/jira/browse/SPARK-46639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-46639: --- Description: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} As an example use-case, WindowExec spilling behavior depends on multiple factors and it can sometime cause {{SparkOutOfMemoryError}} instead of spilling to disk so it is hard to track without SQL Metrics such as: *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) per task (a.k.a child RDD partition) *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds spark.sql.windowExec.buffer.in.memory.threshold=4096, ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as spillableArray by moving its all buffered rows into UnsafeExternalSorter and ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a UnsafeInMemorySorter). *3-* UnsafeExternalSorter is being created per window partition. When UnsafeExternalSorter' buffer size exceeds spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) content. In this case, UnsafeExternalSorter will continue to buffer next records until exceeding spark.sql.windowExec.buffer.spill.threshold. Sample UI Screenshot: !WindowExec SQLMetrics.png|width=257,height=152! was: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} As an example use-case, WindowExec spilling behavior depends on multiple factors and it can sometime cause {{SparkOutOfMemoryError}} instead of spilling to disk so it is hard to track without SQL Metrics such as: *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) per task (a.k.a child RDD partition) *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds spark.sql.windowExec.buffer.in.memory.threshold=4096, ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as spillableArray by moving its all buffered rows into UnsafeExternalSorter and ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a UnsafeInMemorySorter). *3-* UnsafeExternalSorter is being created per window partition. When UnsafeExternalSorter' buffer size exceeds spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) content. In this case, UnsafeExternalSorter will continue to buffer next records until exceeding spark.sql.windowExec.buffer.spill.threshold. > Add WindowExec SQLMetrics > - > > Key: SPARK-46639 > URL: https://issues.apache.org/jira/browse/SPARK-46639 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > Attachments: WindowExec SQLMetrics.png > > > Currently, WindowExec Physical Operator has only spillSize SQLMetric. This > jira aims to add following SQLMetrics to provide more information from > WindowExec usage during query execution: > {code:java} > numOfOutputRows: Number of total output rows. > numOfPartitions: Number of processed input partitions. > numOfWindowPartitions: Number of generated window partitions. > spilledRows: Number of total spilled rows. > spillSizeOnDisk: Total spilled data size on disk.{code} > As an example use-case, WindowExec spilling behavior depends on multiple > factors and it can sometime cause {{SparkOutOfMemoryError}} instead of > spilling to disk so it is hard to track without SQL Metrics such as: > *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal > ArrayBuffer) per task (a.k.a child RDD partition) > *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds > spark.sql.windowExec.buffer.in.memory.threshold=4096, >
[jira] [Updated] (SPARK-46639) Add WindowExec SQLMetrics
[ https://issues.apache.org/jira/browse/SPARK-46639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-46639: --- Attachment: WindowExec SQLMetrics.png > Add WindowExec SQLMetrics > - > > Key: SPARK-46639 > URL: https://issues.apache.org/jira/browse/SPARK-46639 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > Attachments: WindowExec SQLMetrics.png > > > Currently, WindowExec Physical Operator has only spillSize SQLMetric. This > jira aims to add following SQLMetrics to provide more information from > WindowExec usage during query execution: > {code:java} > numOfOutputRows: Number of total output rows. > numOfPartitions: Number of processed input partitions. > numOfWindowPartitions: Number of generated window partitions. > spilledRows: Number of total spilled rows. > spillSizeOnDisk: Total spilled data size on disk.{code} > As an example use-case, WindowExec spilling behavior depends on multiple > factors and it can sometime cause {{SparkOutOfMemoryError}} instead of > spilling to disk so it is hard to track without SQL Metrics such as: > *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal > ArrayBuffer) per task (a.k.a child RDD partition) > *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds > spark.sql.windowExec.buffer.in.memory.threshold=4096, > ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as > spillableArray by moving its all buffered rows into UnsafeExternalSorter and > ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this > case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a > UnsafeInMemorySorter). > *3-* UnsafeExternalSorter is being created per window partition. When > UnsafeExternalSorter' buffer size exceeds > spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to > write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) > content. In this case, UnsafeExternalSorter will continue to buffer next > records until exceeding spark.sql.windowExec.buffer.spill.threshold. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47148) Avoid to materialize AQE ShuffleQueryStage on the cancellation
[ https://issues.apache.org/jira/browse/SPARK-47148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-47148: --- Summary: Avoid to materialize AQE ShuffleQueryStage on the cancellation (was: [AQE] Avoid to materialize ShuffleQueryStage on the cancellation) > Avoid to materialize AQE ShuffleQueryStage on the cancellation > -- > > Key: SPARK-47148 > URL: https://issues.apache.org/jira/browse/SPARK-47148 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > > AQE can materialize *ShuffleQueryStage* on the cancellation. This causes > unnecessary stage materialization by submitting Shuffle Job. Under normal > circumstances, if the stage is already non-materialized (a.k.a > ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be > skipped without materializing it. > Please find sample use-case: > *1- Stage Materialization Steps:* > When stage materialization is failed: > {code:java} > 1.1- ShuffleQueryStage1 - is materialized successfully, > 1.2- ShuffleQueryStage2 - materialization is failed, > 1.3- ShuffleQueryStage3 - Not materialized yet so > ShuffleQueryStage3.shuffleFuture is not initialized yet{code} > *2- Stage Cancellation Steps:* > {code:java} > 2.1- ShuffleQueryStage1 - is canceled due to already materialized, > 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as > default because it could not be materialized, > 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet > but currently, it is also tried to cancel and this stage requires to be > materialized first.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47148) [AQE] Avoid to materialize ShuffleQueryStage on the cancellation
Eren Avsarogullari created SPARK-47148: -- Summary: [AQE] Avoid to materialize ShuffleQueryStage on the cancellation Key: SPARK-47148 URL: https://issues.apache.org/jira/browse/SPARK-47148 Project: Spark Issue Type: Bug Components: Shuffle, SQL Affects Versions: 4.0.0 Reporter: Eren Avsarogullari AQE can materialize *ShuffleQueryStage* on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job. Under normal circumstances, if the stage is already non-materialized (a.k.a ShuffleQueryStage.shuffleFuture is not initialized yet), it should just be skipped without materializing it. Please find sample use-case: *1- Stage Materialization Steps:* When stage materialization is failed: {code:java} 1.1- ShuffleQueryStage1 - is materialized successfully, 1.2- ShuffleQueryStage2 - materialization is failed, 1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet{code} *2- Stage Cancellation Steps:* {code:java} 2.1- ShuffleQueryStage1 - is canceled due to already materialized, 2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default because it could not be materialized, 2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46639) Add WindowExec SQLMetrics
[ https://issues.apache.org/jira/browse/SPARK-46639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-46639: --- Description: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} As an example use-case, WindowExec spilling behavior depends on multiple factors and it can sometime cause {{SparkOutOfMemoryError}} instead of spilling to disk so it is hard to track without SQL Metrics such as: *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) per task (a.k.a child RDD partition) *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds spark.sql.windowExec.buffer.in.memory.threshold=4096, ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as spillableArray by moving its all buffered rows into UnsafeExternalSorter and ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a UnsafeInMemorySorter). *3-* UnsafeExternalSorter is being created per window partition. When UnsafeExternalSorter' buffer size exceeds spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) content. In this case, UnsafeExternalSorter will continue to buffer next records until exceeding spark.sql.windowExec.buffer.spill.threshold. was: Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} > Add WindowExec SQLMetrics > - > > Key: SPARK-46639 > URL: https://issues.apache.org/jira/browse/SPARK-46639 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 4.0.0 >Reporter: Eren Avsarogullari >Priority: Major > Labels: pull-request-available > > Currently, WindowExec Physical Operator has only spillSize SQLMetric. This > jira aims to add following SQLMetrics to provide more information from > WindowExec usage during query execution: > {code:java} > numOfOutputRows: Number of total output rows. > numOfPartitions: Number of processed input partitions. > numOfWindowPartitions: Number of generated window partitions. > spilledRows: Number of total spilled rows. > spillSizeOnDisk: Total spilled data size on disk.{code} > As an example use-case, WindowExec spilling behavior depends on multiple > factors and it can sometime cause {{SparkOutOfMemoryError}} instead of > spilling to disk so it is hard to track without SQL Metrics such as: > *1-* WindowExec creates ExternalAppendOnlyUnsafeRowArray (internal > ArrayBuffer) per task (a.k.a child RDD partition) > *2-* When ExternalAppendOnlyUnsafeRowArray size exceeds > spark.sql.windowExec.buffer.in.memory.threshold=4096, > ExternalAppendOnlyUnsafeRowArray switches to UnsafeExternalSorter as > spillableArray by moving its all buffered rows into UnsafeExternalSorter and > ExternalAppendOnlyUnsafeRowArray (internal ArrayBuffer) is cleared. In this > case, WindowExec starts to write UnsafeExternalSorter' s buffer (a.k.a > UnsafeInMemorySorter). > *3-* UnsafeExternalSorter is being created per window partition. When > UnsafeExternalSorter' buffer size exceeds > spark.sql.windowExec.buffer.spill.threshold=Integer.MAX_VALUE, it starts to > write to disk and get cleared all buffer (a.k.a UnsafeInMemorySorter) > content. In this case, UnsafeExternalSorter will continue to buffer next > records until exceeding spark.sql.windowExec.buffer.spill.threshold. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46639) Add WindowExec SQLMetrics
Eren Avsarogullari created SPARK-46639: -- Summary: Add WindowExec SQLMetrics Key: SPARK-46639 URL: https://issues.apache.org/jira/browse/SPARK-46639 Project: Spark Issue Type: Task Components: SQL Affects Versions: 4.0.0 Reporter: Eren Avsarogullari Currently, WindowExec Physical Operator has only spillSize SQLMetric. This jira aims to add following SQLMetrics to provide more information from WindowExec usage during query execution: {code:java} numOfOutputRows: Number of total output rows. numOfPartitions: Number of processed input partitions. numOfWindowPartitions: Number of generated window partitions. spilledRows: Number of total spilled rows. spillSizeOnDisk: Total spilled data size on disk.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773480#comment-17773480 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/9/23 9:17 PM: - > But this idea only work for one query Could you please provide more info on this? For example, same IMR instance can be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both of them use same IMR instance. Q0 will be materializing IMR instance by TableCacheQueryStage and IMR materialization has to be done before Q0 is completed. Q1 can still introduce TableCacheQueryStage instance to physical plan for same IMR instance, however, this TableCacheQueryStage instance will not submit IMR materialization job due to IMR already materialized at that level, right? Can we have any other use-cases not covered? [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Also, thinking on following use-cases such as *1- Queries using AQE under IMR feature (a.k.a spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* TableCacheQueryStage materializes IMR by submitting Spark job per TableCacheQueryStage/InMemoryTableScanExec instance. *2- Queries not using AQE under IMR feature:* IMR will be materialized by InMemoryTableScanExec.doExecute/doExecuteColumnar() flow. Can InMemoryTableScanExec based solution (to avoid replicated InMemoryRelation materialization) be more inclusive by covering all use-cases? spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 3.5 as default but for queries using < Spark 3.5 or if the feature may need to be disabled in >= Spark 3.5 for some reason. was (Author: erenavsarogullari): > But this idea only work for one query Could you please provide more info on this? For example, same IMR instance can be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both of them use same IMR instance. Q0 will be materializing IMR instance by TableCacheQueryStage and IMR materialization has to be done before Q0 is completed. Q1 can still introduce TableCacheQueryStage instance to physical plan for same IMR instance, however, this TableCacheQueryStage instance will not submit IMR materialization job due to IMR already materialized at that level, right? Can we have any other use-cases not covered? [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Also, thinking on following use-cases such as *1- Queries using AQE under IMR feature (a.k.a spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* TableCacheQueryStage materializes IMR by submitting Spark job per TableCacheQueryStage/InMemoryTableScanExec instance. *2- Queries not using AQE under IMR feature:* IMR will be materialized by InMemoryTableScanExec.doExecute/doExecuteColumnar(). Can InMemoryTableScanExec based solution (to avoid replicated InMemoryRelation materialization) be more inclusive by covering all use-cases? spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 3.5 as default but for queries using < Spark 3.5 or if the feature may need to be disabled in >= Spark 3.5. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773480#comment-17773480 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/9/23 9:16 PM: - > But this idea only work for one query Could you please provide more info on this? For example, same IMR instance can be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both of them use same IMR instance. Q0 will be materializing IMR instance by TableCacheQueryStage and IMR materialization has to be done before Q0 is completed. Q1 can still introduce TableCacheQueryStage instance to physical plan for same IMR instance, however, this TableCacheQueryStage instance will not submit IMR materialization job due to IMR already materialized at that level, right? Can we have any other use-cases not covered? [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Also, thinking on following use-cases such as *1- Queries using AQE under IMR feature (a.k.a spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* TableCacheQueryStage materializes IMR by submitting Spark job per TableCacheQueryStage/InMemoryTableScanExec instance. *2- Queries not using AQE under IMR feature:* IMR will be materialized by InMemoryTableScanExec.doExecute/doExecuteColumnar(). Can InMemoryTableScanExec based solution (to avoid replicated InMemoryRelation materialization) be more inclusive by covering all use-cases? spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 3.5 as default but for queries using < Spark 3.5 or if the feature may need to be disabled in >= Spark 3.5. was (Author: erenavsarogullari): > But this idea only work for one query Could you please provide more info on this? For example, same IMR instance can be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both of them use same IMR instance. Q0 will be materializing IMR instance by TableCacheQueryStage and IMR materialization has to be done before Q0 is completed. Q1 can still introduce TableCacheQueryStage instance to physical plan for same IMR instance, however, this TableCacheQueryStage instance will not submit IMR materialization job due to IMR already materialized at that level, right? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281 Can we have any other use-cases not covered? Also, thinking on following use-cases such as *1- Queries using AQE under IMR feature (a.k.a spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* TableCacheQueryStage materializes IMR by submitting Spark job per TableCacheQueryStage/InMemoryTableScanExec instance. *2- Queries not using AQE under IMR feature:* IMR will be materialized by InMemoryTableScanExec.doExecute/doExecuteColumnar(). Can InMemoryTableScanExec based solution (to avoid replicated InMemoryRelation materialization) be more inclusive by covering all use-cases? spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 3.5 as default but for queries using < Spark 3.5 or if the feature may need to be disabled in >= Spark 3.5. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance >
[jira] [Commented] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773480#comment-17773480 ] Eren Avsarogullari commented on SPARK-45443: > But this idea only work for one query Could you please provide more info on this? For example, same IMR instance can be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both of them use same IMR instance. Q0 will be materializing IMR instance by TableCacheQueryStage and IMR materialization has to be done before Q0 is completed. Q1 can still introduce TableCacheQueryStage instance to physical plan for same IMR instance, however, this TableCacheQueryStage instance will not submit IMR materialization job due to IMR already materialized at that level, right? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281 Can we have any other use-cases not covered? Also, thinking on following use-cases such as *1- Queries using AQE under IMR feature (a.k.a spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* TableCacheQueryStage materializes IMR by submitting Spark job per TableCacheQueryStage/InMemoryTableScanExec instance. *2- Queries not using AQE under IMR feature:* IMR will be materialized by InMemoryTableScanExec.doExecute/doExecuteColumnar(). Can InMemoryTableScanExec based solution (to avoid replicated InMemoryRelation materialization) be more inclusive by covering all use-cases? spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 3.5 as default but for queries using < Spark 3.5 or if the feature may need to be disabled in >= Spark 3.5. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:42 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and can introduce potential regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solution options (if makes sense): For queries using AQE, can introducing TableCacheQueryStage into physical plan once per unique IMR instance help? IMR instances can be compared if they are equivalent before its TableCacheQueryStage instance is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and can introduce potential regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options (if makes sense): For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) >
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:40 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and can introduce potential regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options (if makes sense): For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and can introduce potential regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) >
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:33 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and can introduce potential regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") >
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:29 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec and TableCacheQueryStage can materialize unique IMR instance once. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 =
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:27 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id",
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:25 PM: - Hi [~ulysses], Firstly, thanks for reply. For above sample query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For queries using AQE like above query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4
[jira] [Comment Edited] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari edited comment on SPARK-45443 at 10/8/23 8:24 PM: - Hi [~ulysses], Firstly, thanks for reply. For queries using AQE like above query, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. was (Author: erenavsarogullari): Hi [~ulysses], Firstly, thanks for reply. For queries using AQE, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4
[jira] [Commented] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773050#comment-17773050 ] Eren Avsarogullari commented on SPARK-45443: Hi [~ulysses], Firstly, thanks for reply. For queries using AQE, if TableCacheQueryStage flow is disabled, IMR materialization will be triggered by ShuffleQueryStage (introduced by Sort' s Exchange node). Both ShuffleQueryStage nodes will also need to materialize same IMR instance in this case so i believe same issue may also occur in previous flow. TableCacheQueryStage materializes IMR eagerly as different from previous flow. Can this increase probability of concurrent IMR materialization for same IMR instance? I think this behavior is not visible when IMR cached data size is low. However, replicated IMR materialization can be expensive and introduce regression when IMR cached data size is high (e.g: observing this behavior when IMR needs to read high shuffle data size). Also, the queries can have multiple IMR instances by referencing multiple replicated IMR instances, this can also increase probability of concurrent IMR materialization for same IMR instance. Thinking on potential following solutions options if makes sense: For queries using AQE, can introducing TableCacheQueryStage into PhysicalPlan once per unique IMR instance help? IMR instances can be compared if they are equivalent before TableCacheQueryStage is created by AdaptiveSparkPlanExec. > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} > *Stages DAGs
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations into remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats in order to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. This behavior can be more visible when cached RDD size is high. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2)
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Stages DAGs materializing the same IMR instance:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] Would like to get community feedback. Thanks in advance. cc [~ulysses] [~cloud_fan] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) :
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} *Replicated Stages DAGs:* !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAGs: !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAG: !IMR Materialization - Stage 2.png|width=303,height=134! !IMR Materialization - Stage 3.png|width=303,height=134! was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +-
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Attachment: IMR Materialization - Stage 3.png IMR Materialization - Stage 2.png > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats to apply AQE optimizations onto > remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. > For example, if there are 2 TableCacheQueryStage instances referencing same > IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization > takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} > Replicated Stages DAG: > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Description: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} Replicated Stages DAG: was: TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16),
[jira] [Updated] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization
[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-45443: --- Summary: Revisit TableCacheQueryStage to avoid replicated InMemoryRelation materialization (was: Revisit TableCacheQueryStage to avoid replicated IMR materialization) > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > - > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Eren Avsarogullari >Priority: Major > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats to apply AQE optimizations onto > remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. > For example, if there are 2 TableCacheQueryStage instances referencing same > IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization > takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45443) Revisit TableCacheQueryStage to avoid replicated IMR materialization
Eren Avsarogullari created SPARK-45443: -- Summary: Revisit TableCacheQueryStage to avoid replicated IMR materialization Key: SPARK-45443 URL: https://issues.apache.org/jira/browse/SPARK-45443 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: Eren Avsarogullari TableCacheQueryStage is created per InMemoryTableScanExec by AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached RDD) to provide runtime stats to apply AQE optimizations onto remaining physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage instance. For example, if there are 2 TableCacheQueryStage instances referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s materialization takes longer, following logic will return false (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR materialization. [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] *Sample Query to simulate the problem:* // Both join legs uses same IMR instance {code:java} import spark.implicits._ val arr = (1 to 12).map { i => { val index = i % 5 (index, s"Employee_$index", s"Department_$index") } } val df = arr.toDF("id", "name", "department") .filter('id >= 0) .sort("id") .groupBy('id, 'name, 'department) .count().as("count") df.persist() val df2 = df.sort("count").filter('count <= 2) val df3 = df.sort("count").filter('count >= 3) val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") df4.show() {code} *Physical Plan:* {code:java} == Physical Plan == AdaptiveSparkPlan (31) +- == Final Plan == CollectLimit (21) +- * Project (20) +- * SortMergeJoin FullOuter (19) :- * Sort (10) : +- * Filter (9) : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, rowCount=5) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- AdaptiveSparkPlan (7) : +- HashAggregate (6) : +- Exchange (5) : +- HashAggregate (4) : +- LocalTableScan (3) +- * Sort (18) +- * Filter (17) +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, rowCount=5) +- InMemoryTableScan (11) +- InMemoryRelation (12) +- AdaptiveSparkPlan (15) +- HashAggregate (14) +- Exchange (13) +- HashAggregate (4) +- LocalTableScan (3) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41214) SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled
[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-41214: --- Summary: SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled (was: SubPlan metrics are missed when AQE is enabled under InMemoryRelation) > SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled > -- > > Key: SPARK-41214 > URL: https://issues.apache.org/jira/browse/SPARK-41214 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, > DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > > > *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE > optimizations under InMemoryRelation(IMR) nodes. Following sample query has > IMR node on both BroadcastHashJoin legs. However, > when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, > following datas are missed due to lack of final sub-plans (under IMR) > submissions (into UI). > {code:java} > - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such > as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ > leg, > - WSCG blocks are missed on left BHJ leg, > - AQEShuffleRead node is missed on left BHJ leg. {code} > *Sample to reproduce:* > {code:java} > val spark = SparkSession > .builder() > .config("spark.sql.adaptive.enabled", "true") > .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", > "true") > .master("local[*]") > .getOrCreate() > import spark.implicits._ > // Create 1th DF > val arr = Seq( > (1, "Employee_1", "Department_1"), > (2, "Employee_2", "Department_2")) > val df = arr.toDF("id", "name", "department") > .filter('id < 3) > .groupBy('name) > .count() > df.cache() > // Create 2th DF > val arr2 = Seq((1, "Employee_1", "Department_1")) > val df2 = arr2.toDF("id", "name", "department") > .filter('id > 0) > .groupBy('name) > .count() > df2.cache() > // Trigger query execution > val df3 = df.join(df2, "name") > df3.show() {code} > *DAG before fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON without > fix.png|width=33,height=86!* > *DAG after fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!* -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41214) SubPlan metrics are missed when AQE is enabled under InMemoryRelation
[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-41214: --- Summary: SubPlan metrics are missed when AQE is enabled under InMemoryRelation (was: Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled) > SubPlan metrics are missed when AQE is enabled under InMemoryRelation > - > > Key: SPARK-41214 > URL: https://issues.apache.org/jira/browse/SPARK-41214 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, > DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > > > *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE > optimizations under InMemoryRelation(IMR) nodes. Following sample query has > IMR node on both BroadcastHashJoin legs. However, > when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, > following datas are missed due to lack of final sub-plans (under IMR) > submissions (into UI). > {code:java} > - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such > as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ > leg, > - WSCG blocks are missed on left BHJ leg, > - AQEShuffleRead node is missed on left BHJ leg. {code} > *Sample to reproduce:* > {code:java} > val spark = SparkSession > .builder() > .config("spark.sql.adaptive.enabled", "true") > .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", > "true") > .master("local[*]") > .getOrCreate() > import spark.implicits._ > // Create 1th DF > val arr = Seq( > (1, "Employee_1", "Department_1"), > (2, "Employee_2", "Department_2")) > val df = arr.toDF("id", "name", "department") > .filter('id < 3) > .groupBy('name) > .count() > df.cache() > // Create 2th DF > val arr2 = Seq((1, "Employee_1", "Department_1")) > val df2 = arr2.toDF("id", "name", "department") > .filter('id > 0) > .groupBy('name) > .count() > df2.cache() > // Trigger query execution > val df3 = df.join(df2, "name") > df3.show() {code} > *DAG before fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON without > fix.png|width=33,height=86!* > *DAG after fix:* > *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!* -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41214) Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled
[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-41214: --- Description: *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE optimizations under InMemoryRelation(IMR) nodes. Following sample query has IMR node on both BroadcastHashJoin legs. However, when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, following datas are missed due to lack of final sub-plans (under IMR) submissions (into UI). {code:java} - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ leg, - WSCG blocks are missed on left BHJ leg, - AQEShuffleRead node is missed on left BHJ leg. {code} *Sample to reproduce:* {code:java} val spark = SparkSession .builder() .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true") .master("local[*]") .getOrCreate() import spark.implicits._ // Create 1th DF val arr = Seq( (1, "Employee_1", "Department_1"), (2, "Employee_2", "Department_2")) val df = arr.toDF("id", "name", "department") .filter('id < 3) .groupBy('name) .count() df.cache() // Create 2th DF val arr2 = Seq((1, "Employee_1", "Department_1")) val df2 = arr2.toDF("id", "name", "department") .filter('id > 0) .groupBy('name) .count() df2.cache() // Trigger query execution val df3 = df.join(df2, "name") df3.show() {code} *DAG before fix:* *!DAG when AQE=ON and AQECachedDFSupport=ON without fix.png|width=33,height=86!* *DAG after fix:* *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!* was: *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE optimizations under InMemoryRelation(IMR) nodes. Following sample query has IMR node on both BroadcastHashJoin legs. However, when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, following datas are missed due to lack of final sub-plans (under IMR) submissions (into UI). {code:java} - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ leg, - WSCG blocks are missed on left BHJ leg, - AQEShuffleRead node is missed on left BHJ leg. {code} *Sample to reproduce:* {code:java} val spark = SparkSession .builder() .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true") .master("local[*]") .getOrCreate() import spark.implicits._ // Create 1th DF val arr = Seq( (1, "Employee_1", "Department_1"), (2, "Employee_2", "Department_2")) val df = arr.toDF("id", "name", "department") .filter('id < 3) .groupBy('name) .count() df.cache() // Create 2th DF val arr2 = Seq((1, "Employee_1", "Department_1")) val df2 = arr2.toDF("id", "name", "department") .filter('id > 0) .groupBy('name) .count() df2.cache() // Trigger query execution val df3 = df.join(df2, "name") df3.show() {code} > Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame > Support is enabled > --- > > Key: SPARK-41214 > URL: https://issues.apache.org/jira/browse/SPARK-41214 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, > DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > > > *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE > optimizations under InMemoryRelation(IMR) nodes. Following sample query has > IMR node on both BroadcastHashJoin legs. However, > when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, > following datas are missed due to lack of final sub-plans (under IMR) > submissions (into UI). > {code:java} > - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such > as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ > leg, > - WSCG blocks are missed on left BHJ leg, > - AQEShuffleRead node is missed on left BHJ leg. {code} > *Sample to reproduce:* > {code:java} > val spark = SparkSession > .builder() > .config("spark.sql.adaptive.enabled", "true") > .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", > "true") > .master("local[*]") > .getOrCreate() > import spark.implicits._ > // Create 1th DF > val arr = Seq( > (1, "Employee_1", "Department_1"), > (2, "Employee_2", "Department_2")) > val df = arr.toDF("id", "name", "department") > .filter('id < 3) > .groupBy('name) >
[jira] [Created] (SPARK-41214) Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled
Eren Avsarogullari created SPARK-41214: -- Summary: Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled Key: SPARK-41214 URL: https://issues.apache.org/jira/browse/SPARK-41214 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Eren Avsarogullari Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, DAG when AQE=ON and AQECachedDFSupport=ON without fix.png *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE optimizations under InMemoryRelation(IMR) nodes. Following sample query has IMR node on both BroadcastHashJoin legs. However, when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, following datas are missed due to lack of final sub-plans (under IMR) submissions (into UI). {code:java} - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ leg, - WSCG blocks are missed on left BHJ leg, - AQEShuffleRead node is missed on left BHJ leg. {code} *Sample to reproduce:* {code:java} val spark = SparkSession .builder() .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", "true") .master("local[*]") .getOrCreate() import spark.implicits._ // Create 1th DF val arr = Seq( (1, "Employee_1", "Department_1"), (2, "Employee_2", "Department_2")) val df = arr.toDF("id", "name", "department") .filter('id < 3) .groupBy('name) .count() df.cache() // Create 2th DF val arr2 = Seq((1, "Employee_1", "Department_1")) val df2 = arr2.toDF("id", "name", "department") .filter('id > 0) .groupBy('name) .count() df2.cache() // Trigger query execution val df3 = df.join(df2, "name") df3.show() {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41214) Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame Support is enabled
[ https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-41214: --- Attachment: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > Sub-plan metrics under InMemoryRelation are missed when AQE Cached DataFrame > Support is enabled > --- > > Key: SPARK-41214 > URL: https://issues.apache.org/jira/browse/SPARK-41214 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png, > DAG when AQE=ON and AQECachedDFSupport=ON without fix.png > > > *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE > optimizations under InMemoryRelation(IMR) nodes. Following sample query has > IMR node on both BroadcastHashJoin legs. However, > when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true, > following datas are missed due to lack of final sub-plans (under IMR) > submissions (into UI). > {code:java} > - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such > as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ > leg, > - WSCG blocks are missed on left BHJ leg, > - AQEShuffleRead node is missed on left BHJ leg. {code} > *Sample to reproduce:* > {code:java} > val spark = SparkSession > .builder() > .config("spark.sql.adaptive.enabled", "true") > .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", > "true") > .master("local[*]") > .getOrCreate() > import spark.implicits._ > // Create 1th DF > val arr = Seq( > (1, "Employee_1", "Department_1"), > (2, "Employee_2", "Department_2")) > val df = arr.toDF("id", "name", "department") > .filter('id < 3) > .groupBy('name) > .count() > df.cache() > // Create 2th DF > val arr2 = Seq((1, "Employee_1", "Department_1")) > val df2 = arr2.toDF("id", "name", "department") > .filter('id > 0) > .groupBy('name) > .count() > df2.cache() > // Trigger query execution > val df3 = df.join(df2, "name") > df3.show() {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38222) Expose Node Description attribute in SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-38222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-38222: --- Attachment: Spark_SQL_REST_Result_with-nodeDesc > Expose Node Description attribute in SQL Rest API > - > > Key: SPARK-38222 > URL: https://issues.apache.org/jira/browse/SPARK-38222 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: Spark_SQL_REST_Result_with-nodeDesc > > > Currently, SQL public Rest API does not expose node description and it is > useful to have nodeDesc attribute at query level to have more details such as: > {code:java} > - Join Operators(BHJ, SMJ, SHJ) => when correlating join operator with join > type and which leg is built for BHJ. > - HashAggregate => aggregated keys and agg functions > - List can be extended for other physical operators.{code} > *Current Sample Json Result:* > {code:java} > { > "nodeId" : 14, > "nodeName" : "BroadcastHashJoin", > "wholeStageCodegenId" : 3, > "stageIds" : [ 5 ], > "metrics" : [ { > "name" : "number of output rows", > "value" : { > "amount" : "2" > } > } > }, > ... > { > "nodeId" : 8, > "nodeName" : "HashAggregate", > "wholeStageCodegenId" : 4, > "stageIds" : [ 8 ], > "metrics" : [ { > "name" : "spill size", > "value" : { > "amount" : "0.0" > } > } > } {code} > *New* {*}Sample Json Result{*}{*}:{*} > {code:java} > { > "nodeId" : 14, > "nodeName" : "BroadcastHashJoin", > "nodeDesc" : "BroadcastHashJoin [id#4], [id#24], Inner, BuildLeft, false", > "wholeStageCodegenId" : 3, > "stageIds" : [ 5 ], > "metrics" : [ { > "name" : "number of output rows", > "value" : { > "amount" : "2" > } > } > }, > ... > { > "nodeId" : 8, > "nodeName" : "HashAggregate", > "nodeDesc" : "HashAggregate(keys=[name#5, age#6, salary#18], > functions=[avg(cast(age#6 as bigint)), avg(salary#18)])", > "wholeStageCodegenId" : 4, > "stageIds" : [ 8 ], > "metrics" : [ { > "name" : "spill size", > "value" : { > "amount" : "0.0" > } > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38222) Expose nodeDesc attribute in SQL Rest API
Eren Avsarogullari created SPARK-38222: -- Summary: Expose nodeDesc attribute in SQL Rest API Key: SPARK-38222 URL: https://issues.apache.org/jira/browse/SPARK-38222 Project: Spark Issue Type: Task Components: SQL Affects Versions: 3.2.0 Reporter: Eren Avsarogullari Currently, SQL public Rest API does not expose node description and it is useful to have nodeDesc attribute at query level to have more details such as: {code:java} - Join Operators(BHJ, SMJ, SHJ) => when correlating join operator with join type and which leg is built for BHJ. - HashAggregate => aggregated keys and agg functions - List can be extended for other physical operators.{code} *Current Sample Json Result:* {code:java} { "nodeId" : 14, "nodeName" : "BroadcastHashJoin", "wholeStageCodegenId" : 3, "stageIds" : [ 5 ], "metrics" : [ { "name" : "number of output rows", "value" : { "amount" : "2" } } }, ... { "nodeId" : 8, "nodeName" : "HashAggregate", "wholeStageCodegenId" : 4, "stageIds" : [ 8 ], "metrics" : [ { "name" : "spill size", "value" : { "amount" : "0.0" } } } {code} *New* {*}Sample Json Result{*}{*}:{*} {code:java} { "nodeId" : 14, "nodeName" : "BroadcastHashJoin", "nodeDesc" : "BroadcastHashJoin [id#4], [id#24], Inner, BuildLeft, false", "wholeStageCodegenId" : 3, "stageIds" : [ 5 ], "metrics" : [ { "name" : "number of output rows", "value" : { "amount" : "2" } } }, ... { "nodeId" : 8, "nodeName" : "HashAggregate", "nodeDesc" : "HashAggregate(keys=[name#5, age#6, salary#18], functions=[avg(cast(age#6 as bigint)), avg(salary#18)])", "wholeStageCodegenId" : 4, "stageIds" : [ 8 ], "metrics" : [ { "name" : "spill size", "value" : { "amount" : "0.0" } } } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38222) Expose Node Description attribute in SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-38222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-38222: --- Summary: Expose Node Description attribute in SQL Rest API (was: Expose nodeDesc attribute in SQL Rest API) > Expose Node Description attribute in SQL Rest API > - > > Key: SPARK-38222 > URL: https://issues.apache.org/jira/browse/SPARK-38222 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Eren Avsarogullari >Priority: Major > > Currently, SQL public Rest API does not expose node description and it is > useful to have nodeDesc attribute at query level to have more details such as: > {code:java} > - Join Operators(BHJ, SMJ, SHJ) => when correlating join operator with join > type and which leg is built for BHJ. > - HashAggregate => aggregated keys and agg functions > - List can be extended for other physical operators.{code} > *Current Sample Json Result:* > {code:java} > { > "nodeId" : 14, > "nodeName" : "BroadcastHashJoin", > "wholeStageCodegenId" : 3, > "stageIds" : [ 5 ], > "metrics" : [ { > "name" : "number of output rows", > "value" : { > "amount" : "2" > } > } > }, > ... > { > "nodeId" : 8, > "nodeName" : "HashAggregate", > "wholeStageCodegenId" : 4, > "stageIds" : [ 8 ], > "metrics" : [ { > "name" : "spill size", > "value" : { > "amount" : "0.0" > } > } > } {code} > *New* {*}Sample Json Result{*}{*}:{*} > {code:java} > { > "nodeId" : 14, > "nodeName" : "BroadcastHashJoin", > "nodeDesc" : "BroadcastHashJoin [id#4], [id#24], Inner, BuildLeft, false", > "wholeStageCodegenId" : 3, > "stageIds" : [ 5 ], > "metrics" : [ { > "name" : "number of output rows", > "value" : { > "amount" : "2" > } > } > }, > ... > { > "nodeId" : 8, > "nodeName" : "HashAggregate", > "nodeDesc" : "HashAggregate(keys=[name#5, age#6, salary#18], > functions=[avg(cast(age#6 as bigint)), avg(salary#18)])", > "wholeStageCodegenId" : 4, > "stageIds" : [ 8 ], > "metrics" : [ { > "name" : "spill size", > "value" : { > "amount" : "0.0" > } > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37710) Add detailed log message for java.io.IOException occurring on Kryo flow
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Summary: Add detailed log message for java.io.IOException occurring on Kryo flow (was: Add detailed error message for java.io.IOException occurring on Kryo flow) > Add detailed log message for java.io.IOException occurring on Kryo flow > --- > > Key: SPARK-37710 > URL: https://issues.apache.org/jira/browse/SPARK-37710 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Eren Avsarogullari >Priority: Major > > *Input/output error* usually points environmental issues such as disk > read/write failures due to disk corruption, network access failures etc. This > PR aims to be added detailed error message to catch this kind of > environmental cases occurring on problematic BlockManager and logs with > *BlockManager hostname, blockId and blockPath* details. > Following stack-trace occurred on disk corruption: > {code:java} > com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output > error > Serialization trace: > buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) > at com.esotericsoftware.kryo.io.Input.require(Input.java:196) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) > at > org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) > ... > Caused by: java.io.IOException: Input/output error > at java.io.FileInputStream.readBytes(Native Method) > at java.io.FileInputStream.read(FileInputStream.java:255) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > at > net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) > at > net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) > at > net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) > at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) > ... 87 more {code} > *Proposed Error Message:* > {code:java} > java.io.IOException: Input/output error. BlockManagerId(driver, localhost, > 49455, None) - blockId: test_my-block-id - blockDiskPath: > /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-12dba181-771e-4ff9-a2bc-fa3ce6dbabfa/11/test_my-block-id > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37710) Add detailed error message for java.io.IOException occurring on Kryo flow
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Description: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added detailed error message to catch this kind of environmental cases occurring on problematic BlockManager and logs with *BlockManager hostname, blockId and blockPath* details. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) ... Caused by: java.io.IOException: Input/output error at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:255) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 87 more {code} *Proposed Error Message:* {code:java} java.io.IOException: Input/output error. BlockManagerId(driver, localhost, 49455, None) - blockId: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-12dba181-771e-4ff9-a2bc-fa3ce6dbabfa/11/test_my-block-id {code} was: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at
[jira] [Updated] (SPARK-37710) Add detailed error message for java.io.IOException occurring on Kryo flow
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Summary: Add detailed error message for java.io.IOException occurring on Kryo flow (was: Add clear error message for java.io.IOException: Input/output error) > Add detailed error message for java.io.IOException occurring on Kryo flow > - > > Key: SPARK-37710 > URL: https://issues.apache.org/jira/browse/SPARK-37710 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Eren Avsarogullari >Priority: Major > > *Input/output error* usually points environmental issues such as disk > read/write failures due to disk corruption, network access failures etc. This > PR aims to be added clear message to catch this kind of environmental cases > occurring on BlockManager and logs with {*}BlockManager hostname, blockId and > blockPath{*}. > Following stack-trace occurred on disk corruption: > {code:java} > com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output > error > Serialization trace: > buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) > at com.esotericsoftware.kryo.io.Input.require(Input.java:196) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) > at > org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) > ... > Caused by: java.io.IOException: Input/output error > at java.io.FileInputStream.readBytes(Native Method) > at java.io.FileInputStream.read(FileInputStream.java:255) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > at > net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) > at > net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) > at > net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) > at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) > ... 87 more {code} > *Proposed Error Message:* > {code:java} > java.io.IOException: Input/output error usually occurs due to environmental > problems (e.g: disk corruption, network failure etc) so please check env > status if healthy. BlockManagerId(driver, localhost, 54937, None) - > blockName: test_my-block-id - blockDiskPath: > /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37710) Add clear error message for java.io.IOException: Input/output error
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Description: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) ... Caused by: java.io.IOException: Input/output error at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:255) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 87 more {code} *Proposed Error Message:* {code:java} java.io.IOException: Input/output error usually occurs due to environmental problems (e.g: disk corruption, network failure etc) so please check env status if healthy. BlockManagerId(driver, localhost, 54937, None) - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id {code} was: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at
[jira] [Updated] (SPARK-37710) Add clear error message for java.io.IOException: Input/output error
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Description: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) ... Caused by: java.io.IOException: Input/output error at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:255) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 87 more {code} *Sample Failure Message:* {code:java} java.io.IOException: Input/output error usually occurs due to environmental problems (e.g: disk corruption, network failure etc) so please check env status if healthy. BlockManagerId(driver, localhost, 54937, None) - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id {code} was: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at
[jira] [Updated] (SPARK-37710) Add clear error message for java.io.IOException: Input/output error
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Summary: Add clear error message for java.io.IOException: Input/output error (was: Log clear message for Input/output error) > Add clear error message for java.io.IOException: Input/output error > --- > > Key: SPARK-37710 > URL: https://issues.apache.org/jira/browse/SPARK-37710 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.1.1 >Reporter: Eren Avsarogullari >Priority: Major > > *Input/output error* usually points environmental issues such as disk > read/write failures due to disk corruption, network access failures etc. This > PR aims to be added clear message to catch this kind of environmental cases > occurring on BlockManager and logs with {*}BlockManager hostname, blockId and > blockPath{*}. > Following stack-trace occurred on disk corruption: > {code:java} > com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output > error > Serialization trace: > buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) > at com.esotericsoftware.kryo.io.Input.require(Input.java:196) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) > at > org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) > ... > Caused by: java.io.IOException: Input/output error > at java.io.FileInputStream.readBytes(Native Method) > at java.io.FileInputStream.read(FileInputStream.java:255) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > at > net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) > at > net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) > at > net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) > at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) > at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) > ... 87 more {code} > *Sample Failure Message:* > {code:java} > java.io.IOException: Input/output error usually occurs due to environmental > problems (e.g: disk corruption, network failure etc) so please check env > status if healthy. BlockManagerId(driver, localhost, 54937, None) - > blockName: test_my-block-id - blockDiskPath: > /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37710) Log clear message for Input/output error
[ https://issues.apache.org/jira/browse/SPARK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-37710: --- Description: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) ... Caused by: java.io.IOException: Input/output error at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:255) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 87 more {code} *Sample Failure Message:* {code:java} java.io.IOException: Input/output error usually occurs due to environmental problems (e.g: disk corruption, network failure etc) so please check env status if healthy. BlockManagerId(driver, localhost, 54937, None) - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id {code} was: *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at
[jira] [Created] (SPARK-37710) Log clear message for Input/output error
Eren Avsarogullari created SPARK-37710: -- Summary: Log clear message for Input/output error Key: SPARK-37710 URL: https://issues.apache.org/jira/browse/SPARK-37710 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.1.1 Reporter: Eren Avsarogullari *Input/output error* usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on BlockManager and logs with {*}BlockManager hostname, blockId and blockPath{*}. Following stack-trace occurred on disk corruption: {code:java} com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error Serialization trace: buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch) at com.esotericsoftware.kryo.io.Input.fill(Input.java:166) at com.esotericsoftware.kryo.io.Input.require(Input.java:196) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346) at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163) ... Caused by: java.io.IOException: Input/output error at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:255) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269) at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at com.esotericsoftware.kryo.io.Input.fill(Input.java:164) ... 87 more {code} *Sample Failure Message:* {code:java} java.io.IOException: Input/output error usually occurs due to environmental problems (e.g: disk corruption, network failure etc) so please check env status if healthy. BlockManagerId(driver, localhost, 54937, None) - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pmgq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id {code} ** -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32548) Add Application attemptId support to SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-32548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-32548: --- Description: Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes *no such app: application_X* issue when the application has *{{attemptId}}*. Please find existing and supported Rest endpoints with {{attemptId}}. {code:java} // Existing Rest Endpoints applications/{appId}/sql applications/{appId}/sql/{executionId} // Rest Endpoints required support applications/{appId}/{attemptId}/sql applications/{appId}/{attemptId}/sql/{executionId}{code} was: Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes *no such app: application_X* issue when the application has *{{attemptId}}*. Please find existing and supported Rest endpoints with {{attemptId}}. {{}} {code:java} {code} {{// Existing Rest Endpoints}} {{applications/\{appId}/sql }} {{applications/\{appId}/sql/\{executionId}}} {{// Rest Endpoints required support applications/\{appId}/\{attemptId}/sql applications/\{appId}/\{attemptId}/sql/\{executionId}}} {{}} > Add Application attemptId support to SQL Rest API > - > > Key: SPARK-32548 > URL: https://issues.apache.org/jira/browse/SPARK-32548 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > Currently, Spark Public Rest APIs support Application attemptId except SQL > API. This causes *no such app: application_X* issue when the application has > *{{attemptId}}*. > Please find existing and supported Rest endpoints with {{attemptId}}. > {code:java} > // Existing Rest Endpoints > applications/{appId}/sql > applications/{appId}/sql/{executionId} > // Rest Endpoints required support > applications/{appId}/{attemptId}/sql > applications/{appId}/{attemptId}/sql/{executionId}{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32548) Add Application attemptId support to SQL Rest API
Eren Avsarogullari created SPARK-32548: -- Summary: Add Application attemptId support to SQL Rest API Key: SPARK-32548 URL: https://issues.apache.org/jira/browse/SPARK-32548 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.0 Reporter: Eren Avsarogullari Currently, Spark Public Rest APIs support Application attemptId except SQL API. This causes *no such app: application_X* issue when the application has *{{attemptId}}*. Please find existing and supported Rest endpoints with {{attemptId}}. {{}} {code:java} {code} {{// Existing Rest Endpoints}} {{applications/\{appId}/sql }} {{applications/\{appId}/sql/\{executionId}}} {{// Rest Endpoints required support applications/\{appId}/\{attemptId}/sql applications/\{appId}/\{attemptId}/sql/\{executionId}}} {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32026) Add PrometheusServletSuite
[ https://issues.apache.org/jira/browse/SPARK-32026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-32026: --- Description: This Jira aims to be added _PrometheusServletSuite_. *Note:* This jira was created to propose _Prometheus_ driver metric format change to be consistent with executor metric format. However, currently, _PrometheusServlet_ follows _Spark 3.0 JMX Sink + Prometheus JMX Exporter_ format so this Jira coverage has been converted to just _PrometheusServletSuite_ in the light of discussion on https://github.com/apache/spark/pull/28865. was: Spark 3.0 introduces native Prometheus Sink for both driver and executor metrics. However, they need consistency on format (e.g: `applicationId`). Currently, driver covers `applicationId` in metric name. If this can extract as executor metric format, this can also support consistency and help to query. *Driver* {code:java} metrics_local_1592242896665_driver_BlockManager_memory_memUsed_MB_Value{type="gauges"} 0{code} *Executor* {code:java} metrics_executor_memoryUsed_bytes{application_id="local-1592242896665", application_name="apache-spark-fundamentals", executor_id="driver"} 24356{code} *Proposed Driver Format* {code:java} metrics_driver_BlockManager_memory_memUsed_MB_Value{application_id="local-1592242896665", type="gauges"} 0{code} > Add PrometheusServletSuite > -- > > Key: SPARK-32026 > URL: https://issues.apache.org/jira/browse/SPARK-32026 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Eren Avsarogullari >Priority: Major > > This Jira aims to be added _PrometheusServletSuite_. > *Note:* This jira was created to propose _Prometheus_ driver metric format > change to be consistent with executor metric format. However, currently, > _PrometheusServlet_ follows _Spark 3.0 JMX Sink + Prometheus JMX Exporter_ > format so this Jira coverage has been converted to just > _PrometheusServletSuite_ in the light of discussion on > https://github.com/apache/spark/pull/28865. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32026) Add PrometheusServletSuite
[ https://issues.apache.org/jira/browse/SPARK-32026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-32026: --- Summary: Add PrometheusServletSuite (was: Add PrometheusServlet Unit Test coverage) > Add PrometheusServletSuite > -- > > Key: SPARK-32026 > URL: https://issues.apache.org/jira/browse/SPARK-32026 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Eren Avsarogullari >Priority: Major > > Spark 3.0 introduces native Prometheus Sink for both driver and executor > metrics. However, they need consistency on format (e.g: `applicationId`). > Currently, driver covers `applicationId` in metric name. If this can extract > as executor metric format, this can also support consistency and help to > query. > *Driver* > {code:java} > metrics_local_1592242896665_driver_BlockManager_memory_memUsed_MB_Value{type="gauges"} > 0{code} > *Executor* > {code:java} > metrics_executor_memoryUsed_bytes{application_id="local-1592242896665", > application_name="apache-spark-fundamentals", executor_id="driver"} > 24356{code} > *Proposed Driver Format* > {code:java} > metrics_driver_BlockManager_memory_memUsed_MB_Value{application_id="local-1592242896665", > type="gauges"} 0{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32026) Add PrometheusServlet Unit Test coverage
[ https://issues.apache.org/jira/browse/SPARK-32026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148211#comment-17148211 ] Eren Avsarogullari commented on SPARK-32026: Currently, _PrometheusServlet_ follows Spark 3.0 JMX Sink + Prometheus JMX Exporter format so for this Jira coverage, just _PrometheusServletSuite_ is added in the light of discussion on https://github.com/apache/spark/pull/28865. > Add PrometheusServlet Unit Test coverage > > > Key: SPARK-32026 > URL: https://issues.apache.org/jira/browse/SPARK-32026 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Eren Avsarogullari >Priority: Major > > Spark 3.0 introduces native Prometheus Sink for both driver and executor > metrics. However, they need consistency on format (e.g: `applicationId`). > Currently, driver covers `applicationId` in metric name. If this can extract > as executor metric format, this can also support consistency and help to > query. > *Driver* > {code:java} > metrics_local_1592242896665_driver_BlockManager_memory_memUsed_MB_Value{type="gauges"} > 0{code} > *Executor* > {code:java} > metrics_executor_memoryUsed_bytes{application_id="local-1592242896665", > application_name="apache-spark-fundamentals", executor_id="driver"} > 24356{code} > *Proposed Driver Format* > {code:java} > metrics_driver_BlockManager_memory_memUsed_MB_Value{application_id="local-1592242896665", > type="gauges"} 0{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32026) Add PrometheusServlet Unit Test coverage
[ https://issues.apache.org/jira/browse/SPARK-32026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-32026: --- Summary: Add PrometheusServlet Unit Test coverage (was: Support consistency on Prometheus driver and executor metrics format) > Add PrometheusServlet Unit Test coverage > > > Key: SPARK-32026 > URL: https://issues.apache.org/jira/browse/SPARK-32026 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Eren Avsarogullari >Priority: Major > > Spark 3.0 introduces native Prometheus Sink for both driver and executor > metrics. However, they need consistency on format (e.g: `applicationId`). > Currently, driver covers `applicationId` in metric name. If this can extract > as executor metric format, this can also support consistency and help to > query. > *Driver* > {code:java} > metrics_local_1592242896665_driver_BlockManager_memory_memUsed_MB_Value{type="gauges"} > 0{code} > *Executor* > {code:java} > metrics_executor_memoryUsed_bytes{application_id="local-1592242896665", > application_name="apache-spark-fundamentals", executor_id="driver"} > 24356{code} > *Proposed Driver Format* > {code:java} > metrics_driver_BlockManager_memory_memUsed_MB_Value{application_id="local-1592242896665", > type="gauges"} 0{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32026) Support consistency on Prometheus driver and executor metrics format
Eren Avsarogullari created SPARK-32026: -- Summary: Support consistency on Prometheus driver and executor metrics format Key: SPARK-32026 URL: https://issues.apache.org/jira/browse/SPARK-32026 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 3.0.1 Reporter: Eren Avsarogullari Spark 3.0 introduces native Prometheus Sink for both driver and executor metrics. However, they need consistency on format (e.g: `applicationId`). Currently, driver covers `applicationId` in metric name. If this can extract as executor metric format, this can also support consistency and help to query. *Driver* {code:java} metrics_local_1592242896665_driver_BlockManager_memory_memUsed_MB_Value{type="gauges"} 0{code} *Executor* {code:java} metrics_executor_memoryUsed_bytes{application_id="local-1592242896665", application_name="apache-spark-fundamentals", executor_id="driver"} 24356{code} *Proposed Driver Format* {code:java} metrics_driver_BlockManager_memory_memUsed_MB_Value{application_id="local-1592242896665", type="gauges"} 0{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Attachment: improved_version_May1th.json > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json, > improved_version_May1th.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per physical operation by > aligning Spark UI. > 2- Support *wholeStageCodegenId* for Physical Operations > 3- *nodeId* can be useful for grouping metrics and sorting physical > operations (according to execution order) to differentiate same operators (if > used multiple times during the same query execution) and their metrics. > 4- Filter *empty* metrics by aligning with Spark UI - SQL Tab. Currently, > Spark UI does not show empty metrics. > 5- Remove line breakers(*\n*) from *metricValue*. > 6- *planDescription* can be *optional* Http parameter to avoid network cost > where there is specially complex jobs creating big-plans. > 7- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. Specially, this can be useful for the user where > *metricDetails* array size is high. > 8- Reverse order on *metricDetails* aims to match with Spark UI by supporting > Physical Operators' execution order. > *Attachments:* > Please find both *current* and *improved* versions of the results as > attached for following SQL Rest Endpoint: > {code:java} > curl -X GET > http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31566) Add SQL Rest API Documentation
[ https://issues.apache.org/jira/browse/SPARK-31566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17095948#comment-17095948 ] Eren Avsarogullari commented on SPARK-31566: Hi Pablo, There is ongoing PR on this: https://github.com/apache/spark/pull/28354 Also, It will be updated in the light of [https://github.com/apache/spark/pull/28208] Hope these help. Thanks > Add SQL Rest API Documentation > -- > > Key: SPARK-31566 > URL: https://issues.apache.org/jira/browse/SPARK-31566 > Project: Spark > Issue Type: Documentation > Components: Documentation, SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > SQL Rest API exposes query execution metrics as Public API. Its documentation > will be useful for end-users. > {code:java} > /applications/[app-id]/sql > 1- A list of all queries for a given application. > 2- ?details=[true|false (default)] lists metric details in addition to > queries details. > 3- ?offset=[offset]=[len] lists queries in the given range.{code} > {code:java} > /applications/[app-id]/sql/[execution-id] > 1- Details for the given query. > 2- ?details=[true|false (default)] lists metric details in addition to given > query details.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31566) Add SQL Rest API Documentation
[ https://issues.apache.org/jira/browse/SPARK-31566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31566: --- Description: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- Details for the given query. 2- ?details=[true|false (default)] lists metric details in addition to given query details.{code} was: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} > Add SQL Rest API Documentation > -- > > Key: SPARK-31566 > URL: https://issues.apache.org/jira/browse/SPARK-31566 > Project: Spark > Issue Type: Documentation > Components: Documentation, SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > SQL Rest API exposes query execution metrics as Public API. Its documentation > will be useful for end-users. > {code:java} > /applications/[app-id]/sql > 1- A list of all queries for a given application. > 2- ?details=[true|false (default)] lists metric details in addition to > queries details. > 3- ?offset=[offset]=[len] lists queries in the given range.{code} > {code:java} > /applications/[app-id]/sql/[execution-id] > 1- Details for the given query. > 2- ?details=[true|false (default)] lists metric details in addition to given > query details.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31566) Add SQL Rest API Documentation
[ https://issues.apache.org/jira/browse/SPARK-31566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31566: --- Description: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} was: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} > Add SQL Rest API Documentation > -- > > Key: SPARK-31566 > URL: https://issues.apache.org/jira/browse/SPARK-31566 > Project: Spark > Issue Type: Documentation > Components: Documentation, SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > SQL Rest API exposes query execution metrics as Public API. Its documentation > will be useful for end-users. Also, this is a follow-up jira with > https://issues.apache.org/jira/browse/SPARK-31440 > {code:java} > /applications/[app-id]/sql > 1- A list of all queries for a given application. > 2- ?details=[true|false (default)] lists metric details in addition to > queries details. > 3- ?details=true=[true (default)|false] > enables/disables Physical planDescription on demand when Physical Plan size > is high. > 4- ?offset=[offset]=[len] lists queries in the given range.{code} > {code:java} > /applications/[app-id]/sql/[execution-id] > 1- ?details=[true|false (default)] lists metric details in addition to given > query details. > 2- ?details=true=[true (default)|false] enables/disables > Physical planDescription on demand for the given query when Physical Plan > size is high.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31566) Add SQL Rest API Documentation
[ https://issues.apache.org/jira/browse/SPARK-31566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31566: --- Description: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} was: SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} > Add SQL Rest API Documentation > -- > > Key: SPARK-31566 > URL: https://issues.apache.org/jira/browse/SPARK-31566 > Project: Spark > Issue Type: Documentation > Components: Documentation, SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > SQL Rest API exposes query execution metrics as Public API. Its documentation > will be useful for end-users. Also, this is a follow-up jira with > https://issues.apache.org/jira/browse/SPARK-31440 > {code:java} > /applications/[app-id]/sql > 1- A list of all queries for a given application. > 2- ?details=[true|false (default)] lists metric details in addition to > queries details. > 3- ?details=true=[true (default)|false] > enables/disables Physical planDescription on demand when Physical Plan size > is high. > 4- ?offset=[offset]=[len] lists queries in the given range.{code} > > {code:java} > /applications/[app-id]/sql/[execution-id] > 1- ?details=[true|false (default)] lists metric details in addition to given > query details. > 2- ?details=true=[true (default)|false] enables/disables > Physical planDescription on demand for the given query when Physical Plan > size is high.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31566) Add SQL Rest API Documentation
Eren Avsarogullari created SPARK-31566: -- Summary: Add SQL Rest API Documentation Key: SPARK-31566 URL: https://issues.apache.org/jira/browse/SPARK-31566 Project: Spark Issue Type: Documentation Components: Documentation, SQL Affects Versions: 3.1.0 Reporter: Eren Avsarogullari SQL Rest API exposes query execution metrics as Public API. Its documentation will be useful for end-users. Also, this is a follow-up jira with https://issues.apache.org/jira/browse/SPARK-31440 {code:java} /applications/[app-id]/sql 1- A list of all queries for a given application. 2- ?details=[true|false (default)] lists metric details in addition to queries details. 3- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand when Physical Plan size is high. 4- ?offset=[offset]=[len] lists queries in the given range.{code} {code:java} /applications/[app-id]/sql/[execution-id] 1- ?details=[true|false (default)] lists metric details in addition to given query details. 2- ?details=true=[true (default)|false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Attachment: (was: improved_version.json) > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per physical operation by > aligning Spark UI. > 2- Support *wholeStageCodegenId* for Physical Operations > 3- *nodeId* can be useful for grouping metrics and sorting physical > operations (according to execution order) to differentiate same operators (if > used multiple times during the same query execution) and their metrics. > 4- Filter *empty* metrics by aligning with Spark UI - SQL Tab. Currently, > Spark UI does not show empty metrics. > 5- Remove line breakers(*\n*) from *metricValue*. > 6- *planDescription* can be *optional* Http parameter to avoid network cost > where there is specially complex jobs creating big-plans. > 7- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. Specially, this can be useful for the user where > *metricDetails* array size is high. > 8- Reverse order on *metricDetails* aims to match with Spark UI by supporting > Physical Operators' execution order. > *Attachments:* > Please find both *current* and *improved* versions of the results as > attached for following SQL Rest Endpoint: > {code:java} > curl -X GET > http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Attachment: improved_version.json > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per physical operation by > aligning Spark UI. > 2- Support *wholeStageCodegenId* for Physical Operations > 3- *nodeId* can be useful for grouping metrics and sorting physical > operations (according to execution order) to differentiate same operators (if > used multiple times during the same query execution) and their metrics. > 4- Filter *empty* metrics by aligning with Spark UI - SQL Tab. Currently, > Spark UI does not show empty metrics. > 5- Remove line breakers(*\n*) from *metricValue*. > 6- *planDescription* can be *optional* Http parameter to avoid network cost > where there is specially complex jobs creating big-plans. > 7- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. Specially, this can be useful for the user where > *metricDetails* array size is high. > 8- Reverse order on *metricDetails* aims to match with Spark UI by supporting > Physical Operators' execution order. > *Attachments:* > Please find both *current* and *improved* versions of the results as > attached for following SQL Rest Endpoint: > {code:java} > curl -X GET > http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Description: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per physical operation by aligning Spark UI. 2- Support *wholeStageCodegenId* for Physical Operations 3- *nodeId* can be useful for grouping metrics and sorting physical operations (according to execution order) to differentiate same operators (if used multiple times during the same query execution) and their metrics. 4- Filter *empty* metrics by aligning with Spark UI - SQL Tab. Currently, Spark UI does not show empty metrics. 5- Remove line breakers(*\n*) from *metricValue*. 6- *planDescription* can be *optional* Http parameter to avoid network cost where there is specially complex jobs creating big-plans. 7- *metrics* attribute needs to be exposed at the bottom order as *metricDetails*. Specially, this can be useful for the user where *metricDetails* array size is high. 8- Reverse order on *metricDetails* aims to match with Spark UI by supporting Physical Operators' execution order. *Attachments:* Please find both *current* and *improved* versions of the results as attached for following SQL Rest Endpoint: {code:java} curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} was: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- *nodeId* can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab 4- Remove *\n* from *metricValue(s)* 5- *planDescription* can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- *metrics* attribute needs to be exposed at the bottom order as *metricDetails*. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of the results as attached for following SQL Rest Endpoint: {code:java} curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per physical operation by > aligning Spark UI. > 2- Support *wholeStageCodegenId* for Physical Operations > 3- *nodeId* can be useful for grouping metrics and sorting physical > operations (according to execution order) to differentiate same operators (if > used multiple times during the same query execution) and their metrics. > 4- Filter *empty* metrics by aligning with Spark UI - SQL Tab. Currently, > Spark UI does not show empty metrics. > 5- Remove line breakers(*\n*) from *metricValue*. > 6- *planDescription* can be *optional* Http parameter to avoid network cost > where there is specially complex jobs creating big-plans. > 7- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. Specially, this can be useful for the user where > *metricDetails* array size is high. > 8- Reverse order on *metricDetails* aims to match with Spark UI by supporting > Physical Operators' execution order. > *Attachments:* > Please find both *current* and *improved* versions of the results as > attached for following SQL Rest Endpoint: > {code:java} > curl -X GET > http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Description: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- *nodeId* can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab 4- Remove *\n* from *metricValue(s)* 5- *planDescription* can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- *metrics* attribute needs to be exposed at the bottom order as *metricDetails*. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of the results as attached for following SQL Rest Endpoint: {code:java} curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} was: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- *nodeId* can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab 4- Remove *\n* from *metricValue(s)* 5- *planDescription* can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- *metrics* attribute needs to be exposed at the bottom order as *metricDetails*. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of results as attached. > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per operation by aligning > Spark UI. > 2- *nodeId* can be useful for grouping metrics as well as for sorting and to > differentiate same operators and their metrics. > 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab > 4- Remove *\n* from *metricValue(s)* > 5- *planDescription* can be optional Http parameter to avoid network cost > (specially for complex jobs creating big-plans). > 6- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. This order matches with Spark UI by highlighting with > execution order. > *Attachments:* > Please find both *current* and *improved* versions of the results as > attached for following SQL Rest Endpoint: > {code:java} > curl -X GET > http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Attachment: current_version.json > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per operation by aligning > Spark UI. > 2- *nodeId* can be useful for grouping metrics as well as for sorting and to > differentiate same operators and their metrics. > 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab > 4- Remove *\n* from *metricValue(s)* > 5- *planDescription* can be optional Http parameter to avoid network cost > (specially for complex jobs creating big-plans). > 6- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. This order matches with Spark UI by highlighting with > execution order. > *Attachments:* > Please find both *current* and *improved* versions of results as attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Attachment: improved_version.json > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > Attachments: current_version.json, improved_version.json > > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per operation by aligning > Spark UI. > 2- *nodeId* can be useful for grouping metrics as well as for sorting and to > differentiate same operators and their metrics. > 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab > 4- Remove *\n* from *metricValue(s)* > 5- *planDescription* can be optional Http parameter to avoid network cost > (specially for complex jobs creating big-plans). > 6- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. This order matches with Spark UI by highlighting with > execution order. > *Attachments:* > Please find both *current* and *improved* versions of results as attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31440) Improve SQL Rest API
[ https://issues.apache.org/jira/browse/SPARK-31440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-31440: --- Description: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- *nodeId* can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab 4- Remove *\n* from *metricValue(s)* 5- *planDescription* can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- *metrics* attribute needs to be exposed at the bottom order as *metricDetails*. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of results as attached. was: SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- `nodeId` can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter `blank` metrics by aligning with Spark UI - SQL Tab 4- Remove `\n` from `metricValue` 5- `planDescription` can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- `metrics` attribute needs to be exposed at the bottom order as `metricDetails`. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of results as attached. > Improve SQL Rest API > > > Key: SPARK-31440 > URL: https://issues.apache.org/jira/browse/SPARK-31440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Eren Avsarogullari >Priority: Major > > SQL Rest API exposes query execution metrics as Public API. This Jira aims to > apply following improvements on SQL Rest API by aligning Spark-UI. > *Proposed Improvements:* > 1- Support Physical Operations and group metrics per operation by aligning > Spark UI. > 2- *nodeId* can be useful for grouping metrics as well as for sorting and to > differentiate same operators and their metrics. > 3- Filter *blank* metrics by aligning with Spark UI - SQL Tab > 4- Remove *\n* from *metricValue(s)* > 5- *planDescription* can be optional Http parameter to avoid network cost > (specially for complex jobs creating big-plans). > 6- *metrics* attribute needs to be exposed at the bottom order as > *metricDetails*. This order matches with Spark UI by highlighting with > execution order. > *Attachments:* > Please find both *current* and *improved* versions of results as attached. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31440) Improve SQL Rest API
Eren Avsarogullari created SPARK-31440: -- Summary: Improve SQL Rest API Key: SPARK-31440 URL: https://issues.apache.org/jira/browse/SPARK-31440 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Eren Avsarogullari SQL Rest API exposes query execution metrics as Public API. This Jira aims to apply following improvements on SQL Rest API by aligning Spark-UI. *Proposed Improvements:* 1- Support Physical Operations and group metrics per operation by aligning Spark UI. 2- `nodeId` can be useful for grouping metrics as well as for sorting and to differentiate same operators and their metrics. 3- Filter `blank` metrics by aligning with Spark UI - SQL Tab 4- Remove `\n` from `metricValue` 5- `planDescription` can be optional Http parameter to avoid network cost (specially for complex jobs creating big-plans). 6- `metrics` attribute needs to be exposed at the bottom order as `metricDetails`. This order matches with Spark UI by highlighting with execution order. *Attachments:* Please find both *current* and *improved* versions of results as attached. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19964) SparkSubmitSuite fails due to Timeout
[ https://issues.apache.org/jira/browse/SPARK-19964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-19964: --- Attachment: SparkSubmitSuite_Stacktrace > SparkSubmitSuite fails due to Timeout > - > > Key: SPARK-19964 > URL: https://issues.apache.org/jira/browse/SPARK-19964 > Project: Spark > Issue Type: Bug > Components: Deploy, Tests >Affects Versions: 2.2.0 >Reporter: Eren Avsarogullari > Labels: flaky-test > Attachments: SparkSubmitSuite_Stacktrace > > > The following test case has been failed due to TestFailedDueToTimeoutException > *Test Suite:* SparkSubmitSuite > *Test Case:* includes jars passed in through --packages > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74413/testReport/ > *Stacktrace is also attached.* -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19964) SparkSubmitSuite fails due to Timeout
Eren Avsarogullari created SPARK-19964: -- Summary: SparkSubmitSuite fails due to Timeout Key: SPARK-19964 URL: https://issues.apache.org/jira/browse/SPARK-19964 Project: Spark Issue Type: Bug Components: Deploy, Tests Affects Versions: 2.2.0 Reporter: Eren Avsarogullari The following test case has been failed due to TestFailedDueToTimeoutException *Test Suite:* SparkSubmitSuite *Test Case:* includes jars passed in through --packages https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74413/testReport/ *Stacktrace is also attached.* -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19662) Add Fair Scheduler Unit Test coverage for different build cases
[ https://issues.apache.org/jira/browse/SPARK-19662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-19662: --- Description: Fair Scheduler can be built via one of the following options: - By setting a {{spark.scheduler.allocation.file}} property - By setting {{fairscheduler.xml}} into classpath, These options are checked in order and fair-scheduler is built via first found one. If invalid path is found, {{FileNotFoundException}} will be expected. Related PR aims unit test coverage of these use cases and a minor documentation change has been added for second option({{fairscheduler.xml}} into classpath) to inform the user. was: Fair Scheduler can be built via one of the following options: - By setting a `spark.scheduler.allocation.file` property - By setting `fairscheduler.xml` into classpath, These options are checked in order and fair-scheduler is built via first found one. If invalid path is found, FileNotFoundException will be expected. Related PR aims unit test coverage of these use cases and a minor documentation change has been added for second option(`fairscheduler.xml` into classpath) to inform the user. > Add Fair Scheduler Unit Test coverage for different build cases > --- > > Key: SPARK-19662 > URL: https://issues.apache.org/jira/browse/SPARK-19662 > Project: Spark > Issue Type: Test > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari >Priority: Minor > > Fair Scheduler can be built via one of the following options: > - By setting a {{spark.scheduler.allocation.file}} property > - By setting {{fairscheduler.xml}} into classpath, > These options are checked in order and fair-scheduler is built via first > found one. If invalid path is found, {{FileNotFoundException}} will be > expected. > Related PR aims unit test coverage of these use cases and a minor > documentation change has been added for second option({{fairscheduler.xml}} > into classpath) to inform the user. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19662) Add Fair Scheduler Unit Test coverage for different build cases
Eren Avsarogullari created SPARK-19662: -- Summary: Add Fair Scheduler Unit Test coverage for different build cases Key: SPARK-19662 URL: https://issues.apache.org/jira/browse/SPARK-19662 Project: Spark Issue Type: Test Components: Scheduler Affects Versions: 2.1.0 Reporter: Eren Avsarogullari Priority: Minor Fair Scheduler can be built via one of the following options: - By setting a `spark.scheduler.allocation.file` property - By setting `fairscheduler.xml` into classpath, These options are checked in order and fair-scheduler is built via first found one. If invalid path is found, FileNotFoundException will be expected. Related PR aims unit test coverage of these use cases and a minor documentation change has been added for second option(`fairscheduler.xml` into classpath) to inform the user. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19567) Support some Schedulable variables immutability and access
[ https://issues.apache.org/jira/browse/SPARK-19567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862983#comment-15862983 ] Eren Avsarogullari commented on SPARK-19567: Hi [~srowen], Thanks for quick response. I prepared the following patch for this and it is related with https://github.com/apache/spark/pull/15604 https://github.com/erenavsarogullari/spark/commit/d158b789c98923d5989f9fd50fd7fd3a4f1fc1ff Also, i totally agree about public APIs and suggested patch is at the scheduler implementation level(private[spark] Schedulable Entities and TaskSchedulerImpl) so not the part of public APIs. > Support some Schedulable variables immutability and access > -- > > Key: SPARK-19567 > URL: https://issues.apache.org/jira/browse/SPARK-19567 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari >Priority: Minor > > Support some Schedulable variables immutability and access > Some Schedulable variables need refactoring for immutability and access > modifiers as follows: > - from vars to vals(if there is no requirement): This is important to support > immutability as much as possible. > Sample => Pool: weight, minShare, priority, name and > taskSetSchedulingAlgorithm. > - access modifiers: Specially, vars access needs to be restricted from other > parts of codebase to prevent potential side effects. Sample: > Sample => TaskSetManager: tasksSuccessful, totalResultSize, calculatedTasks > etc... -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19567) Support some Schedulable variables immutability and access
Eren Avsarogullari created SPARK-19567: -- Summary: Support some Schedulable variables immutability and access Key: SPARK-19567 URL: https://issues.apache.org/jira/browse/SPARK-19567 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 2.1.0 Reporter: Eren Avsarogullari Priority: Minor Support some Schedulable variables immutability and access Some Schedulable variables need refactoring for immutability and access modifiers as follows: - from vars to vals(if there is no requirement): This is important to support immutability as much as possible. Sample => Pool: weight, minShare, priority, name and taskSetSchedulingAlgorithm. - access modifiers: Specially, vars access needs to be restricted from other parts of codebase to prevent potential side effects. Sample: Sample => TaskSetManager: tasksSuccessful, totalResultSize, calculatedTasks etc... -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19466) Improve Fair Scheduler Logging
[ https://issues.apache.org/jira/browse/SPARK-19466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-19466: --- Description: Fair Scheduler Logging for the following cases can be useful for the user. 1- If *valid* spark.scheduler.allocation.file property is set, user can be informed so user can aware which scheduler file is processed when SparkContext initializes. 2- If *invalid* spark.scheduler.allocation.file property is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler and covering other potential issues at this level. {code:xml} Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) {code} 3- If spark.scheduler.allocation.file property is not set and *default* fair scheduler file(fairscheduler.xml) is found in classpath, it will be loaded but currently, user is not informed so logging can be useful. 4- If spark.scheduler.allocation.file property is not set and default fair scheduler file does not exist, currently, user is not informed so logging can be useful. was: Fair Scheduler Logging for the following cases can be useful for the user. 1- If valid spark.scheduler.allocation.file property is set, user can be informed so user can aware which scheduler file is processed when SparkContext initializes. 2- If invalid spark.scheduler.allocation.file is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler and covering other potential issues at this level. Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) 3- If spark.scheduler.allocation.file is not set and default fair scheduler file(fairscheduler.xml) is found in classpath, it will be loaded but currently, user is not informed so logging can be useful. 4- If spark.scheduler.allocation.file is not set and default fair scheduler file does not exist, currently, user is not informed so logging can be useful. > Improve Fair Scheduler Logging > -- > > Key: SPARK-19466 > URL: https://issues.apache.org/jira/browse/SPARK-19466 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.2.0 >Reporter: Eren Avsarogullari > > Fair Scheduler Logging for the following cases can be useful for the user. > 1- If *valid* spark.scheduler.allocation.file property is set, user can be > informed so user can aware which scheduler file is processed when > SparkContext initializes. > 2- If *invalid* spark.scheduler.allocation.file property is set, currently, > the following stacktrace is shown to user. In addition to this, more > meaningful message can be shown to user by emphasizing the problem at > building level of fair scheduler and covering other potential issues at this > level. > {code:xml} > Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No > such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at java.io.FileInputStream.(FileInputStream.java:93) > at > org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) > at > org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) > {code} > 3- If spark.scheduler.allocation.file property is not set and *default* fair > scheduler file(fairscheduler.xml) is found in classpath, it will be loaded > but currently, user is not informed so logging can be useful. > 4-
[jira] [Created] (SPARK-19466) Improve Fair Scheduler Logging
Eren Avsarogullari created SPARK-19466: -- Summary: Improve Fair Scheduler Logging Key: SPARK-19466 URL: https://issues.apache.org/jira/browse/SPARK-19466 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 2.2.0 Reporter: Eren Avsarogullari Fair Scheduler Logging for the following cases can be useful for the user. 1- If valid spark.scheduler.allocation.file property is set, user can be informed so user can aware which scheduler file is processed when SparkContext initializes. 2- If invalid spark.scheduler.allocation.file is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler and covering other potential issues at this level. Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) 3- If spark.scheduler.allocation.file is not set and default fair scheduler file(fairscheduler.xml) is found in classpath, it will be loaded but currently, user is not informed so logging can be useful. 4- If spark.scheduler.allocation.file is not set and default fair scheduler file does not exist, currently, user is not informed so logging can be useful. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18066) Add Pool usage policies test coverage for FIFO & FAIR Schedulers
[ https://issues.apache.org/jira/browse/SPARK-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-18066: --- Summary: Add Pool usage policies test coverage for FIFO & FAIR Schedulers (was: Add Pool usage policies test coverage to FIFO & FAIR Schedulers) > Add Pool usage policies test coverage for FIFO & FAIR Schedulers > > > Key: SPARK-18066 > URL: https://issues.apache.org/jira/browse/SPARK-18066 > Project: Spark > Issue Type: Test > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari >Priority: Minor > > The following Pool usage cases need to have Unit test coverage : > - FIFO Scheduler just uses *root pool* so even if *spark.scheduler.pool* > property is set, related pool is not created and *TaskSetManagers* are added > to root pool. > - FAIR Scheduler uses default pool when *spark.scheduler.pool* property is > not set. This can be happened when Properties object is null or empty(*new > Properties()*) or points default pool(*spark.scheduler.pool*=_default_). > - FAIR Scheduler creates a new pool with default values when > *spark.scheduler.pool* property points _non-existent_ pool. This can be > happened when scheduler allocation file is not set or it does not contain > related pool. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18066) Add Pool usage policies test coverage to FIFO & FAIR Schedulers
[ https://issues.apache.org/jira/browse/SPARK-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-18066: --- Description: The following Pool usage cases need to have Unit test coverage : - FIFO Scheduler just uses *root pool* so even if *spark.scheduler.pool* property is set, related pool is not created and *TaskSetManagers* are added to root pool. - FAIR Scheduler uses default pool when *spark.scheduler.pool* property is not set. This can be happened when Properties object is null or empty(*new Properties()*) or points default pool(*spark.scheduler.pool*=_default_). - FAIR Scheduler creates a new pool with default values when *spark.scheduler.pool* property points _non-existent_ pool. This can be happened when scheduler allocation file is not set or it does not contain related pool. was: The following Pool usage cases need to have Unit test coverage : - FIFO Scheduler just uses *root pool* so even if {code:java}spark.scheduler.pool{code} property is set, related pool is not created and {code:java}TaskSetManagers{code} are added to root pool. - FAIR Scheduler uses default pool when spark.scheduler.pool property is not set. This can be happened when Properties object is null or empty(new Properties()) or points default pool(spark.scheduler.pool=default). - FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property points non-existent pool. This can be happened when scheduler allocation file is not set or it does not contain related pool. > Add Pool usage policies test coverage to FIFO & FAIR Schedulers > --- > > Key: SPARK-18066 > URL: https://issues.apache.org/jira/browse/SPARK-18066 > Project: Spark > Issue Type: Test > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari >Priority: Minor > > The following Pool usage cases need to have Unit test coverage : > - FIFO Scheduler just uses *root pool* so even if *spark.scheduler.pool* > property is set, related pool is not created and *TaskSetManagers* are added > to root pool. > - FAIR Scheduler uses default pool when *spark.scheduler.pool* property is > not set. This can be happened when Properties object is null or empty(*new > Properties()*) or points default pool(*spark.scheduler.pool*=_default_). > - FAIR Scheduler creates a new pool with default values when > *spark.scheduler.pool* property points _non-existent_ pool. This can be > happened when scheduler allocation file is not set or it does not contain > related pool. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18066) Add Pool usage policies test coverage to FIFO & FAIR Schedulers
[ https://issues.apache.org/jira/browse/SPARK-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-18066: --- Description: The following Pool usage cases need to have Unit test coverage : - FIFO Scheduler just uses *root pool* so even if {code:java}spark.scheduler.pool{code} property is set, related pool is not created and {code:java}TaskSetManagers{code} are added to root pool. - FAIR Scheduler uses default pool when spark.scheduler.pool property is not set. This can be happened when Properties object is null or empty(new Properties()) or points default pool(spark.scheduler.pool=default). - FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property points non-existent pool. This can be happened when scheduler allocation file is not set or it does not contain related pool. was: The following Pool usage cases need to have Unit test coverage : - FIFO Scheduler just uses root pool so even if spark.scheduler.pool property is set, related pool is not created and TaskSetManagers are added to root pool. - FAIR Scheduler uses default pool when spark.scheduler.pool property is not set. This can be happened when Properties object is null or empty(new Properties()) or points default pool(spark.scheduler.pool=default). - FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property points non-existent pool. This can be happened when scheduler allocation file is not set or it does not contain related pool. > Add Pool usage policies test coverage to FIFO & FAIR Schedulers > --- > > Key: SPARK-18066 > URL: https://issues.apache.org/jira/browse/SPARK-18066 > Project: Spark > Issue Type: Test > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari >Priority: Minor > > The following Pool usage cases need to have Unit test coverage : > - FIFO Scheduler just uses *root pool* so even if > {code:java}spark.scheduler.pool{code} property is set, related pool is not > created and {code:java}TaskSetManagers{code} are added to root pool. > - FAIR Scheduler uses default pool when spark.scheduler.pool property is not > set. This can be happened when Properties object is null or empty(new > Properties()) or points default pool(spark.scheduler.pool=default). > - FAIR Scheduler creates a new pool with default values when > spark.scheduler.pool property points non-existent pool. This can be happened > when scheduler allocation file is not set or it does not contain related pool. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18066) Add Pool usage policies test coverage to FIFO & FAIR Schedulers
Eren Avsarogullari created SPARK-18066: -- Summary: Add Pool usage policies test coverage to FIFO & FAIR Schedulers Key: SPARK-18066 URL: https://issues.apache.org/jira/browse/SPARK-18066 Project: Spark Issue Type: Test Components: Scheduler Affects Versions: 2.1.0 Reporter: Eren Avsarogullari Priority: Minor The following Pool usage cases need to have Unit test coverage : - FIFO Scheduler just uses root pool so even if spark.scheduler.pool property is set, related pool is not created and TaskSetManagers are added to root pool. - FAIR Scheduler uses default pool when spark.scheduler.pool property is not set. This can be happened when Properties object is null or empty(new Properties()) or points default pool(spark.scheduler.pool=default). - FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property points non-existent pool. This can be happened when scheduler allocation file is not set or it does not contain related pool. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) Avoid adding duplicate schedulables
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Summary: Avoid adding duplicate schedulables (was: FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools.) > Avoid adding duplicate schedulables > --- > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. *Also solution should cover all schedulables(Pool and > TaskSetManager)* > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FIFO > > > 0 > 1 > FIFO > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for *attached screenshots* -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Description: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Also solution should cover all schedulables(Pool and TaskSetManager)* *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FIFO 0 1 FIFO 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. Please have a look for *attached screenshots* was: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FIFO 0 1 FIFO 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. Please have a look for *attached screenshots* > FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. *Also solution should cover all schedulables(Pool and > TaskSetManager)* > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FIFO > > > 0 > 1 > FIFO > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for
[jira] [Updated] (SPARK-17894) Ensure uniqueness of TaskSetManager name
[ https://issues.apache.org/jira/browse/SPARK-17894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17894: --- Description: TaskSetManager should have unique name to avoid adding duplicate ones to *Pool* via *SchedulableBuilder*. This problem surfaced with https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: https://github.com/apache/spark/pull/15326 *Proposal* : There is 1x1 relationship between Stage Attempt Id and TaskSetManager so taskSet.Id covering both stageId and stageAttemptId looks to be used for TaskSetManager as well. *Current TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} *Sample*: TaskSet_0 *Proposed TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + stageAttemptId) {code} *Sample* : TaskSet_0.0 cc [~kayousterhout] [~markhamstra] was: TaskSetManager should have unique name to avoid adding duplicate ones to *Pool* via *SchedulableBuilder*. This problem surfaced with https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: https://github.com/apache/spark/pull/15326 There is 1x1 relationship between Stage Attempt Id and TaskSetManager so taskSet.Id covering both stageId and stageAttemptId looks to be used for TaskSetManager as well. *Current TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} *Sample*: TaskSet_0 *Proposed TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + stageAttemptId) {code} *Sample* : TaskSet_0.0 cc [~kayousterhout] [~markhamstra] > Ensure uniqueness of TaskSetManager name > > > Key: SPARK-17894 > URL: https://issues.apache.org/jira/browse/SPARK-17894 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > TaskSetManager should have unique name to avoid adding duplicate ones to > *Pool* via *SchedulableBuilder*. This problem surfaced with > https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: > https://github.com/apache/spark/pull/15326 > *Proposal* : > There is 1x1 relationship between Stage Attempt Id and TaskSetManager so > taskSet.Id covering both stageId and stageAttemptId looks to be used for > TaskSetManager as well. > *Current TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} > *Sample*: TaskSet_0 > *Proposed TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + > stageAttemptId) {code} > *Sample* : TaskSet_0.0 > cc [~kayousterhout] [~markhamstra] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17894) Ensure uniqueness of TaskSetManager name
[ https://issues.apache.org/jira/browse/SPARK-17894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17894: --- Summary: Ensure uniqueness of TaskSetManager name (was: Uniqueness of TaskSetManager name) > Ensure uniqueness of TaskSetManager name > > > Key: SPARK-17894 > URL: https://issues.apache.org/jira/browse/SPARK-17894 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > TaskSetManager should have unique name to avoid adding duplicate ones to > *Pool* via *SchedulableBuilder*. This problem surfaced with > https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: > https://github.com/apache/spark/pull/15326 > There is 1x1 relationship between Stage Attempt Id and TaskSetManager so > taskSet.Id covering both stageId and stageAttemptId looks to be used for > TaskSetManager as well. > *Current TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} > *Sample*: TaskSet_0 > *Proposed TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + > stageAttemptId) {code} > *Sample* : TaskSet_0.0 > cc [~kayousterhout] [~markhamstra] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17894) Uniqueness of TaskSetManager name
[ https://issues.apache.org/jira/browse/SPARK-17894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17894: --- Description: TaskSetManager should have unique name to avoid adding duplicate ones to *Pool* via *SchedulableBuilder*. This problem surfaced with https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: https://github.com/apache/spark/pull/15326 There is 1x1 relationship between Stage Attempt Id and TaskSetManager so taskSet.Id covering both stageId and stageAttemptId looks to be used for TaskSetManager as well. *Current TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} *Sample*: TaskSet_0 *Proposed TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + stageAttemptId) {code} *Sample* : TaskSet_0.0 cc [~kayousterhout] [~markhamstra] was: TaskSetManager should have unique name to avoid adding duplicate ones to *Pool* via *SchedulableBuilder*. This problem surfaced with https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: https://github.com/apache/spark/pull/15326 There is 1x1 relationship between Stage Attempt Id and TaskSetManager so taskSet.Id covering both stageId and stageAttemptId looks to be used for TaskSetManager as well. What do you think about proposed TaskSetManager Name? *Current TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} *Sample*: TaskSet_0 *Proposed TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + stageAttemptId) {code} *Sample* : TaskSet_0.0 > Uniqueness of TaskSetManager name > - > > Key: SPARK-17894 > URL: https://issues.apache.org/jira/browse/SPARK-17894 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > TaskSetManager should have unique name to avoid adding duplicate ones to > *Pool* via *SchedulableBuilder*. This problem surfaced with > https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: > https://github.com/apache/spark/pull/15326 > There is 1x1 relationship between Stage Attempt Id and TaskSetManager so > taskSet.Id covering both stageId and stageAttemptId looks to be used for > TaskSetManager as well. > *Current TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} > *Sample*: TaskSet_0 > *Proposed TaskSetManager Name* : > {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + > stageAttemptId) {code} > *Sample* : TaskSet_0.0 > cc [~kayousterhout] [~markhamstra] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17894) Uniqueness of TaskSetManager name
Eren Avsarogullari created SPARK-17894: -- Summary: Uniqueness of TaskSetManager name Key: SPARK-17894 URL: https://issues.apache.org/jira/browse/SPARK-17894 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.1.0 Reporter: Eren Avsarogullari TaskSetManager should have unique name to avoid adding duplicate ones to *Pool* via *SchedulableBuilder*. This problem surfaced with https://issues.apache.org/jira/browse/SPARK-17759 and please find discussion: https://github.com/apache/spark/pull/15326 There is 1x1 relationship between Stage Attempt Id and TaskSetManager so taskSet.Id covering both stageId and stageAttemptId looks to be used for TaskSetManager as well. What do you think about proposed TaskSetManager Name? *Current TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.stageId.toString{code} *Sample*: TaskSet_0 *Proposed TaskSetManager Name* : {code:java} var name = "TaskSet_" + taskSet.Id (stageId + "." + stageAttemptId) {code} *Sample* : TaskSet_0.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Description: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FIFO 0 1 FIFO 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. Please have a look for *attached screenshots* was: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. Please have a look for *attached screenshots* > FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FIFO > > > 0 > 1 > FIFO > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for *attached screenshots* -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (SPARK-17759) FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Summary: FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools. (was: SchedulableBuilder should avoid to create duplicate fair scheduler-pools.) > FairSchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FAIR > > > 0 > 1 > FAIR > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for *attached screenshots* -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) SchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Attachment: duplicate_pools2.png > SchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FAIR > > > 0 > 1 > FAIR > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for *attached screenshots* -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) SchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Description: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. Please have a look for *attached screenshots* was: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > SchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png, duplicate_pools2.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FAIR > > > 0 > 1 > FAIR > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > Please have a look for *attached screenshots* -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-17759) SchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Attachment: duplicate_pools.png > SchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > Attachments: duplicate_pools.png > > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FAIR > > > 0 > 1 > FAIR > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17759) SchedulableBuilder should avoid to create duplicate fair scheduler-pools.
Eren Avsarogullari created SPARK-17759: -- Summary: SchedulableBuilder should avoid to create duplicate fair scheduler-pools. Key: SPARK-17759 URL: https://issues.apache.org/jira/browse/SPARK-17759 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.1.0 Reporter: Eren Avsarogullari If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:scala} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17759) SchedulableBuilder should avoid to create duplicate fair scheduler-pools.
[ https://issues.apache.org/jira/browse/SPARK-17759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17759: --- Description: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:java} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. was: If _spark.scheduler.allocation.file_ has duplicate pools, all of them are created when _SparkContext_ is initialized but just one of them is used and the other ones look redundant. This causes _redundant pool_ creation and needs to be fixed. *Code to Reproduce* : {code:scala} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-duplicate-pools.xml") val sc = new SparkContext(conf) {code} *fairscheduler-duplicate-pools.xml* : The following sample just shows two default and duplicate_pool1 but this can also be thought for N default and/or other duplicate pools. {code:xml} 0 1 FAIR 0 1 FAIR 1 1 FAIR 2 2 FAIR {code} *Debug Screenshot* : This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 pools as default, default, duplicate_pool1, duplicate_pool1 but Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has default and duplicate_pool1 due to pool name as key so one of default and duplicate_pool1 look as redundant and live in Pool.schedulableQueue. > SchedulableBuilder should avoid to create duplicate fair scheduler-pools. > - > > Key: SPARK-17759 > URL: https://issues.apache.org/jira/browse/SPARK-17759 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > If _spark.scheduler.allocation.file_ has duplicate pools, all of them are > created when _SparkContext_ is initialized but just one of them is used and > the other ones look redundant. This causes _redundant pool_ creation and > needs to be fixed. > *Code to Reproduce* : > {code:java} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-duplicate-pools.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-duplicate-pools.xml* : > The following sample just shows two default and duplicate_pool1 but this can > also be thought for N default and/or other duplicate pools. > {code:xml} > > > 0 > 1 > FAIR > > > 0 > 1 > FAIR > > > 1 > 1 > FAIR > > > 2 > 2 > FAIR > > > {code} > *Debug Screenshot* : > This means Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable]) has 4 > pools as default, default, duplicate_pool1, duplicate_pool1 but > Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable]) has > default and duplicate_pool1 due to pool name as key so one of default and > duplicate_pool1 look as redundant and live in Pool.schedulableQueue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17663) SchedulableBuilder should handle invalid data access via scheduler.allocation.file
[ https://issues.apache.org/jira/browse/SPARK-17663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17663: --- Description: If spark.scheduler.allocation.file has invalid minShare or/and weight values, these cause : - NumberFormatException due to toInt function - SparkContext can not be initialized. - It does not show meaningful error message to user. In a nutshell, this functionality can be more robust by selecting one of the following flows : *1-* Currently, if schedulingMode has an invalid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well *2-* Meaningful error message can be shown to the user for all invalid cases. *Code to Reproduce* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} was: If spark.scheduler.allocation.file has invalid *minShare* or/and *weight*, they cause NumberFormatException due to function *toInt* and SparkContext can not be initialized. Currently, if schedulingMode does not have valid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well. *Code to Reproduce* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} > SchedulableBuilder should handle invalid data access via > scheduler.allocation.file > -- > > Key: SPARK-17663 > URL: https://issues.apache.org/jira/browse/SPARK-17663 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > If spark.scheduler.allocation.file has invalid minShare or/and weight values, > these cause : > - NumberFormatException due to toInt function > - SparkContext can not be initialized. > - It does not show meaningful error message to user. > In a nutshell, this functionality can be more robust by selecting one of the > following flows : > *1-* Currently, if schedulingMode has an invalid value, a warning message is > logged and default value is set as FIFO. Same pattern can be used for > minShare(default: 0) and weight(default: 1) as well > *2-* Meaningful error message can be shown to the user for all invalid cases. > *Code to Reproduce* : > {code} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") >
[jira] [Updated] (SPARK-17663) SchedulableBuilder should handle invalid data access via scheduler.allocation.file
[ https://issues.apache.org/jira/browse/SPARK-17663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17663: --- Affects Version/s: (was: 2.0.1) 2.1.0 > SchedulableBuilder should handle invalid data access via > scheduler.allocation.file > -- > > Key: SPARK-17663 > URL: https://issues.apache.org/jira/browse/SPARK-17663 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Eren Avsarogullari > > If spark.scheduler.allocation.file has invalid *minShare* or/and *weight*, > they cause NumberFormatException due to function *toInt* and SparkContext can > not be initialized. Currently, if schedulingMode does not have valid value, a > warning message is logged and default value is set as FIFO. Same pattern can > be used for minShare(default: 0) and weight(default: 1) as well. > *Code to Reproduce* : > {code} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-invalid-data.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-invalid-data.xml* : > {code} > > > FIFO > invalid_weight > 2 > > > {code} > *Stacktrace* : > {code} > Exception in thread "main" java.lang.NumberFormatException: For input string: > "invalid_weight" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) > at > org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17663) SchedulableBuilder should handle invalid data access via scheduler.allocation.file
[ https://issues.apache.org/jira/browse/SPARK-17663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17663: --- Description: If spark.scheduler.allocation.file has invalid *minShare* or/and *weight*, they cause NumberFormatException due to function *toInt* and SparkContext can not be initialized. Currently, if schedulingMode does not have valid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well. *Code to Reproduce* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} was: If spark.scheduler.allocation.file has invalid minShare or/and weight, they cause NumberFormatException due to function toInt and SparkContext can not be initialized. Currently, if schedulingMode does not have valid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well. *Code to Reproduce* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} > SchedulableBuilder should handle invalid data access via > scheduler.allocation.file > -- > > Key: SPARK-17663 > URL: https://issues.apache.org/jira/browse/SPARK-17663 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.1 >Reporter: Eren Avsarogullari > > If spark.scheduler.allocation.file has invalid *minShare* or/and *weight*, > they cause NumberFormatException due to function *toInt* and SparkContext can > not be initialized. Currently, if schedulingMode does not have valid value, a > warning message is logged and default value is set as FIFO. Same pattern can > be used for minShare(default: 0) and weight(default: 1) as well. > *Code to Reproduce* : > {code} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-invalid-data.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-invalid-data.xml* : > {code} > > > FIFO > invalid_weight > 2 > > > {code} > *Stacktrace* : > {code} > Exception in thread "main" java.lang.NumberFormatException: For input string: > "invalid_weight" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at
[jira] [Updated] (SPARK-17663) SchedulableBuilder should handle invalid data access via scheduler.allocation.file
[ https://issues.apache.org/jira/browse/SPARK-17663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eren Avsarogullari updated SPARK-17663: --- Description: If spark.scheduler.allocation.file has invalid minShare or/and weight, they cause NumberFormatException due to function toInt and SparkContext can not be initialized. Currently, if schedulingMode does not have valid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well. *Code to Reproduce* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} was: If spark.scheduler.allocation.file has invalid minShare or/and weight, they cause NumberFormatException due to function toInt and SparkContext can not be initialized. Currently, if schedulingMode does not have valid value, a warning message is logged and default value is set as FIFO. Same pattern can be used for minShare(default: 0) and weight(default: 1) as well. *Reproduce Code* : {code} val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) {code} *fairscheduler-invalid-data.xml* : {code} FIFO invalid_weight 2 {code} *Stacktrace* : {code} Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) {code} > SchedulableBuilder should handle invalid data access via > scheduler.allocation.file > -- > > Key: SPARK-17663 > URL: https://issues.apache.org/jira/browse/SPARK-17663 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.0.1 >Reporter: Eren Avsarogullari > > If spark.scheduler.allocation.file has invalid minShare or/and weight, they > cause NumberFormatException due to function toInt and SparkContext can not be > initialized. Currently, if schedulingMode does not have valid value, a > warning message is logged and default value is set as FIFO. Same pattern can > be used for minShare(default: 0) and weight(default: 1) as well. > *Code to Reproduce* : > {code} > val conf = new > SparkConf().setAppName("spark-fairscheduler").setMaster("local") > conf.set("spark.scheduler.mode", "FAIR") > conf.set("spark.scheduler.allocation.file", > "src/main/resources/fairscheduler-invalid-data.xml") > val sc = new SparkContext(conf) > {code} > *fairscheduler-invalid-data.xml* : > {code} > > > FIFO > invalid_weight > 2 > > > {code} > *Stacktrace* : > {code} > Exception in thread "main" java.lang.NumberFormatException: For input string: > "invalid_weight" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at