[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Description: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to compute states correctly. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener set in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state store is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) // state store is overwritten here } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. was: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener set in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state store is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same
[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Description: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener set in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state store is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) // state store is overwritten here } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. was: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener set in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data.
[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Description: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener set in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) // state store is overwritten here } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. was: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It
[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Description: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) // state store is overwritten here } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. was: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data.
[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Description: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) // state store overwritten } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. was: When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be
[jira] [Created] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
Chungmin Lee created SPARK-48838: Summary: foreachBatch can cause IllegalStateException if used in a wrong way Key: SPARK-48838 URL: https://issues.apache.org/jira/browse/SPARK-48838 Project: Spark Issue Type: Documentation Components: Structured Streaming Affects Versions: 3.5.1 Reporter: Chungmin Lee When foreachBatch is used for a stateful query, it could cause IllegalStateException if the function doesn't iterate the passed batch fully. The following code demonstrates the issue: {code:scala} import org.apache.spark.sql.DataFrame spark.readStream .format("rate") .option("rowsPerSecond", "10") .load() .dropDuplicates() .writeStream .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) .start() {code} Error: {noformat} 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239) java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of HDFSStateStoreProvider[id = (op=0,part=2),dir = file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) {noformat} This is because every record must be processed to correctly compute states. Specifically, if the output isn't fully exhausted, StateStore.commit() is not called, and StateStore.abort() is called eventually by the TaskCompletionListener setup in mapPartitionsWithStateStore. There are also some related but minor issues: # If the function doesn't iterate the batch at all, state is not updated at all. # If the function iterates the batch multiple times, stateful operators run multiple times for the same data. It can be an issue if the user is using arbitrary stateful operators which are non-deterministic. This is an example for the second point: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.streaming._ case class Data(timestamp: java.sql.Timestamp, value: Long) def func(key: Long, values: Iterator[Data], state: GroupState[Int]): Iterator[String] = { val counter = new scala.util.Random().nextInt() println(s"counter=$counter") val len = values.toSeq.length if (state.exists) { state.update(state.get + len) } else { state.update(len) } Seq(s"key=$key,count=${state.get},counter=$counter").toIterator } def outputDF(df: Dataset[String], df_id: Long) = { println(df_id) println(df.collect().toSeq) println(df.collect().toSeq) } val q = spark.readStream.format("rate").option("rowsPerSecond", "10").load().as[Data] .groupByKey(_.value) .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.NoTimeout)(func) .writeStream .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) .outputMode("update") .start {code} It would be helpful if this is explained in [the documentation for foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way
[ https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-48838: - Priority: Minor (was: Major) > foreachBatch can cause IllegalStateException if used in a wrong way > --- > > Key: SPARK-48838 > URL: https://issues.apache.org/jira/browse/SPARK-48838 > Project: Spark > Issue Type: Documentation > Components: Structured Streaming >Affects Versions: 3.5.1 >Reporter: Chungmin Lee >Priority: Minor > > When foreachBatch is used for a stateful query, it could cause > IllegalStateException if the function doesn't iterate the passed batch fully. > The following code demonstrates the issue: > {code:scala} > import org.apache.spark.sql.DataFrame > spark.readStream > .format("rate") > .option("rowsPerSecond", "10") > .load() > .dropDuplicates() > .writeStream > .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show()) > .start() > {code} > Error: > {noformat} > 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID > 239) > java.lang.IllegalStateException: Error reading delta file > file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of > HDFSStateStoreProvider[id = (op=0,part=2),dir = > file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: > file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta > does not exist > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417) > {noformat} > This is because every record must be processed to correctly compute states. > Specifically, if the output isn't fully exhausted, StateStore.commit() is not > called, and StateStore.abort() is called eventually by the > TaskCompletionListener setup in mapPartitionsWithStateStore. > There are also some related but minor issues: > # If the function doesn't iterate the batch at all, state is not updated at > all. > # If the function iterates the batch multiple times, stateful operators run > multiple times for the same data. It can be an issue if the user is using > arbitrary stateful operators which are non-deterministic. > This is an example for the second point: > {code:scala} > import org.apache.spark.sql._ > import org.apache.spark.sql.streaming._ > case class Data(timestamp: java.sql.Timestamp, value: Long) > def func(key: Long, values: Iterator[Data], state: GroupState[Int]): > Iterator[String] = { > val counter = new scala.util.Random().nextInt() > println(s"counter=$counter") > val len = values.toSeq.length > if (state.exists) { > state.update(state.get + len) > } else { > state.update(len) > } > Seq(s"key=$key,count=${state.get},counter=$counter").toIterator > } > def outputDF(df: Dataset[String], df_id: Long) = { > println(df_id) > println(df.collect().toSeq) > println(df.collect().toSeq) > } > val q = spark.readStream.format("rate").option("rowsPerSecond", > "10").load().as[Data] > .groupByKey(_.value) > .flatMapGroupsWithState(OutputMode.Update, > GroupStateTimeout.NoTimeout)(func) > .writeStream > .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id)) > .outputMode("update") > .start > {code} > It would be helpful if this is explained in [the documentation for > foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch]. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45854) spark.catalog.listTables fails with ParseException after upgrading to Spark 3.4.1 from 3.3.1
[ https://issues.apache.org/jira/browse/SPARK-45854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825515#comment-17825515 ] Chungmin Lee commented on SPARK-45854: -- This is probably a bug in ShowTablesExec (isTempView always returns false if the catalog is not V2SessionCatalog) but it could be an intended behavior. Anyway, you can set spark.sql.legacy.useV1Command to true to workaround the issue. > spark.catalog.listTables fails with ParseException after upgrading to Spark > 3.4.1 from 3.3.1 > > > Key: SPARK-45854 > URL: https://issues.apache.org/jira/browse/SPARK-45854 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, Spark Submit >Affects Versions: 3.4.0, 3.4.1 >Reporter: Andrej Zachar >Priority: Major > > After upgrading to Spark 3.4.1, the listTables() method in PySpark now throws > a ParseException with the message "Syntax error at or near end of input.". > This did not occur in previous versions of Spark, such as 3.3.1. > Install Spark version 3.4.1. > > Run pyspark > ```bash > {{pyspark --packages io.delta:delta-core_2.12:2.4.0 --conf > "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf > "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"}} > ``` > > Attempt to list tables using > ```console > {{spark.range(1).createTempView("test_view")}} > {{spark.catalog.listTables()}} > ``` > Expected result: The listTables() method should return a list of tables > without throwing any exceptions. > Actual result: > {{Traceback (most recent call last):}} > {{File "", line 1, in }} > {{File ".venv/lib/python3.10/site-packages/pyspark/sql/catalog.py", line 302, > in listTables}} > {{iter = self._jcatalog.listTables(dbName).toLocalIterator()}} > {{File > ".venv/lib/python3.10/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in _{_}call{_}_}} > {{File > ".venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", > line 175, in deco}} > {{raise converted from None}} > {{pyspark.errors.exceptions.captured.ParseException:}} > {{[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)}} > == SQL == > ^^^ > >>> > The same code worked correctly in Spark version 3.3.1. > No changes were made to the code aside from upgrading Spark. > Thank you for considering this issue! Any assistance in resolving it would be > greatly appreciated. > Best regards, > Andrej -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45417) Make InheritableThread inherit the active session
[ https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-45417: - Description: Repro: {code:java} # repro.py from multiprocessing.pool import ThreadPool from pyspark import inheritable_thread_target from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Test").getOrCreate() spark.sparkContext.setLogLevel("ERROR") def f(i, spark): print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") print(f"{i} local property foo = {spark.sparkContext.getLocalProperty('foo')}") spark = SparkSession.builder.appName("Test").getOrCreate() print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") pool = ThreadPool(4) spark.sparkContext.setLocalProperty("foo", "bar") pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code} Run as: {{./bin/spark-submit repro.py}} {{getOrCreate()}} doesn't set the active session either. The only way is calling the Java function directly: {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. was: Repro: {code:java} from multiprocessing.pool import ThreadPool from pyspark import inheritable_thread_target from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Test").getOrCreate() spark.sparkContext.setLogLevel("ERROR") def f(i, spark): print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") print(f"{i} local property foo = {spark.sparkContext.getLocalProperty('foo')}") spark = SparkSession.builder.appName("Test").getOrCreate() print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") pool = ThreadPool(4) spark.sparkContext.setLocalProperty("foo", "bar") pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code} {{getOrCreate()}} doesn't set the active session either. The only way is calling the Java function directly: {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. > Make InheritableThread inherit the active session > - > > Key: SPARK-45417 > URL: https://issues.apache.org/jira/browse/SPARK-45417 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Chungmin Lee >Priority: Major > > Repro: > {code:java} > # repro.py > from multiprocessing.pool import ThreadPool > from pyspark import inheritable_thread_target > from pyspark.sql import SparkSession > spark = SparkSession.builder.appName("Test").getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > def f(i, spark): > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > print(f"{i} local property foo = > {spark.sparkContext.getLocalProperty('foo')}") > spark = SparkSession.builder.appName("Test").getOrCreate() > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > pool = ThreadPool(4) > spark.sparkContext.setLocalProperty("foo", "bar") > pool.starmap(inheritable_thread_target(f), [(i, spark) for i in > range(4)]){code} > Run as: {{./bin/spark-submit repro.py}} > {{getOrCreate()}} doesn't set the active session either. The only way is > calling the Java function directly: > {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45417) Make InheritableThread inherit the active session
[ https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-45417: - Summary: Make InheritableThread inherit the active session (was: Make InheritableThread inherit active session) > Make InheritableThread inherit the active session > - > > Key: SPARK-45417 > URL: https://issues.apache.org/jira/browse/SPARK-45417 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Chungmin Lee >Priority: Major > > Repro: > {code:java} > from multiprocessing.pool import ThreadPool > from pyspark import inheritable_thread_target > from pyspark.sql import SparkSession > spark = SparkSession.builder.appName("Test").getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > def f(i, spark): > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > print(f"{i} local property foo = > {spark.sparkContext.getLocalProperty('foo')}") > spark = SparkSession.builder.appName("Test").getOrCreate() > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > pool = ThreadPool(4) > spark.sparkContext.setLocalProperty("foo", "bar") > pool.starmap(inheritable_thread_target(f), [(i, spark) for i in > range(4)]){code} > {{getOrCreate()}} doesn't set the active session either. The only way is > calling the Java function directly: > {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45417) Make InheritableThread inherit active session
[ https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chungmin Lee updated SPARK-45417: - Summary: Make InheritableThread inherit active session (was: InheritableThread doesn't inherit active session) > Make InheritableThread inherit active session > - > > Key: SPARK-45417 > URL: https://issues.apache.org/jira/browse/SPARK-45417 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Chungmin Lee >Priority: Major > > Repro: > {code:java} > from multiprocessing.pool import ThreadPool > from pyspark import inheritable_thread_target > from pyspark.sql import SparkSession > spark = SparkSession.builder.appName("Test").getOrCreate() > spark.sparkContext.setLogLevel("ERROR") > def f(i, spark): > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > print(f"{i} local property foo = > {spark.sparkContext.getLocalProperty('foo')}") > spark = SparkSession.builder.appName("Test").getOrCreate() > print(f"{i} spark = {spark}") > print(f"{i} active session = {SparkSession.getActiveSession()}") > pool = ThreadPool(4) > spark.sparkContext.setLocalProperty("foo", "bar") > pool.starmap(inheritable_thread_target(f), [(i, spark) for i in > range(4)]){code} > {{getOrCreate()}} doesn't set the active session either. The only way is > calling the Java function directly: > {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45417) InheritableThread doesn't inherit active session
Chungmin Lee created SPARK-45417: Summary: InheritableThread doesn't inherit active session Key: SPARK-45417 URL: https://issues.apache.org/jira/browse/SPARK-45417 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.5.0 Reporter: Chungmin Lee Repro: {code:java} from multiprocessing.pool import ThreadPool from pyspark import inheritable_thread_target from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Test").getOrCreate() spark.sparkContext.setLogLevel("ERROR") def f(i, spark): print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") print(f"{i} local property foo = {spark.sparkContext.getLocalProperty('foo')}") spark = SparkSession.builder.appName("Test").getOrCreate() print(f"{i} spark = {spark}") print(f"{i} active session = {SparkSession.getActiveSession()}") pool = ThreadPool(4) spark.sparkContext.setLocalProperty("foo", "bar") pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code} {{getOrCreate()}} doesn't set the active session either. The only way is calling the Java function directly: {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org