[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Description: 
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to compute states correctly. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener set in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state store is not updated 
at all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq) // state store is overwritten here
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].

  was:
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener set in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state store is not updated 
at all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same 

[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Description: 
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener set in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state store is not updated 
at all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq) // state store is overwritten here
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].

  was:
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener set in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. 

[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Description: 
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener set in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq) // state store is overwritten here
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].

  was:
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It 

[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Description: 
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq) // state store is overwritten here
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].

  was:
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. 

[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Description: 
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq) // state store overwritten
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].

  was:
When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be 

[jira] [Created] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)
Chungmin Lee created SPARK-48838:


 Summary: foreachBatch can cause IllegalStateException if used in a 
wrong way
 Key: SPARK-48838
 URL: https://issues.apache.org/jira/browse/SPARK-48838
 Project: Spark
  Issue Type: Documentation
  Components: Structured Streaming
Affects Versions: 3.5.1
Reporter: Chungmin Lee


When foreachBatch is used for a stateful query, it could cause 
IllegalStateException if the function doesn't iterate the passed batch fully.

The following code demonstrates the issue:
{code:scala}
import org.apache.spark.sql.DataFrame
spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .load()
  .dropDuplicates()
  .writeStream
  .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
  .start()
{code}
Error:
{noformat}
24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 239)
java.lang.IllegalStateException: Error reading delta file 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
HDFSStateStoreProvider[id = (op=0,part=2),dir = 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta does 
not exist
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
         at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
{noformat}
This is because every record must be processed to correctly compute states. 
Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
called, and StateStore.abort() is called eventually by the 
TaskCompletionListener setup in mapPartitionsWithStateStore.

There are also some related but minor issues:
 # If the function doesn't iterate the batch at all, state is not updated at 
all.
 # If the function iterates the batch multiple times, stateful operators run 
multiple times for the same data. It can be an issue if the user is using 
arbitrary stateful operators which are non-deterministic.

This is an example for the second point:
{code:scala}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._

case class Data(timestamp: java.sql.Timestamp, value: Long)

def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
Iterator[String] = {
val counter = new scala.util.Random().nextInt()
println(s"counter=$counter")
val len = values.toSeq.length
if (state.exists) {
state.update(state.get + len)
} else {
state.update(len)
}
Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
}

def outputDF(df: Dataset[String], df_id: Long) = {
println(df_id)
println(df.collect().toSeq)
println(df.collect().toSeq)
}

val q = spark.readStream.format("rate").option("rowsPerSecond", 
"10").load().as[Data]
.groupByKey(_.value)
.flatMapGroupsWithState(OutputMode.Update, 
GroupStateTimeout.NoTimeout)(func)
.writeStream
.foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
.outputMode("update")
.start
{code}
It would be helpful if this is explained in [the documentation for 
foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48838) foreachBatch can cause IllegalStateException if used in a wrong way

2024-07-08 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-48838:
-
Priority: Minor  (was: Major)

> foreachBatch can cause IllegalStateException if used in a wrong way
> ---
>
> Key: SPARK-48838
> URL: https://issues.apache.org/jira/browse/SPARK-48838
> Project: Spark
>  Issue Type: Documentation
>  Components: Structured Streaming
>Affects Versions: 3.5.1
>Reporter: Chungmin Lee
>Priority: Minor
>
> When foreachBatch is used for a stateful query, it could cause 
> IllegalStateException if the function doesn't iterate the passed batch fully.
> The following code demonstrates the issue:
> {code:scala}
> import org.apache.spark.sql.DataFrame
> spark.readStream
>   .format("rate")
>   .option("rowsPerSecond", "10")
>   .load()
>   .dropDuplicates()
>   .writeStream
>   .foreachBatch((batch: DataFrame, id: Long) => batch.limit(1).show())
>   .start()
> {code}
> Error:
> {noformat}
> 24/07/09 00:21:03 ERROR Executor: Exception in task 1.0 in stage 17.0 (TID 
> 239)
> java.lang.IllegalStateException: Error reading delta file 
> file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta of 
> HDFSStateStoreProvider[id = (op=0,part=2),dir = 
> file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2]: 
> file:/tmp/temporary-db172114-81ec-490b-a667-93047609fcf1/state/0/2/2.delta 
> does not exist
>          at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:461)
>          at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$4(HDFSBackedStateStoreProvider.scala:417)
> {noformat}
> This is because every record must be processed to correctly compute states. 
> Specifically, if the output isn't fully exhausted, StateStore.commit() is not 
> called, and StateStore.abort() is called eventually by the 
> TaskCompletionListener setup in mapPartitionsWithStateStore.
> There are also some related but minor issues:
>  # If the function doesn't iterate the batch at all, state is not updated at 
> all.
>  # If the function iterates the batch multiple times, stateful operators run 
> multiple times for the same data. It can be an issue if the user is using 
> arbitrary stateful operators which are non-deterministic.
> This is an example for the second point:
> {code:scala}
> import org.apache.spark.sql._
> import org.apache.spark.sql.streaming._
> case class Data(timestamp: java.sql.Timestamp, value: Long)
> def func(key: Long, values: Iterator[Data], state: GroupState[Int]): 
> Iterator[String] = {
> val counter = new scala.util.Random().nextInt()
> println(s"counter=$counter")
> val len = values.toSeq.length
> if (state.exists) {
> state.update(state.get + len)
> } else {
> state.update(len)
> }
> Seq(s"key=$key,count=${state.get},counter=$counter").toIterator
> }
> def outputDF(df: Dataset[String], df_id: Long) = {
> println(df_id)
> println(df.collect().toSeq)
> println(df.collect().toSeq)
> }
> val q = spark.readStream.format("rate").option("rowsPerSecond", 
> "10").load().as[Data]
> .groupByKey(_.value)
> .flatMapGroupsWithState(OutputMode.Update, 
> GroupStateTimeout.NoTimeout)(func)
> .writeStream
> .foreachBatch((ds: Dataset[String], id: Long) => outputDF(ds, id))
> .outputMode("update")
> .start
> {code}
> It would be helpful if this is explained in [the documentation for 
> foreachBatch|https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#foreachbatch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45854) spark.catalog.listTables fails with ParseException after upgrading to Spark 3.4.1 from 3.3.1

2024-03-11 Thread Chungmin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825515#comment-17825515
 ] 

Chungmin Lee commented on SPARK-45854:
--

This is probably a bug in ShowTablesExec (isTempView always returns false if 
the catalog is not V2SessionCatalog) but it could be an intended behavior. 
Anyway, you can set spark.sql.legacy.useV1Command to true to workaround the 
issue.

> spark.catalog.listTables fails with ParseException after upgrading to Spark 
> 3.4.1 from 3.3.1
> 
>
> Key: SPARK-45854
> URL: https://issues.apache.org/jira/browse/SPARK-45854
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, Spark Submit
>Affects Versions: 3.4.0, 3.4.1
>Reporter: Andrej Zachar
>Priority: Major
>
> After upgrading to Spark 3.4.1, the listTables() method in PySpark now throws 
> a ParseException with the message "Syntax error at or near end of input.". 
> This did not occur in previous versions of Spark, such as 3.3.1.
> Install Spark version 3.4.1.
>  
> Run pyspark
> ```bash
> {{pyspark --packages io.delta:delta-core_2.12:2.4.0 --conf 
> "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf 
> "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"}}
> ```
>  
> Attempt to list tables using
> ```console
> {{spark.range(1).createTempView("test_view")}}
> {{spark.catalog.listTables()}}
> ```
> Expected result: The listTables() method should return a list of tables 
> without throwing any exceptions.
> Actual result: 
> {{Traceback (most recent call last):}}
> {{File "", line 1, in }}
> {{File ".venv/lib/python3.10/site-packages/pyspark/sql/catalog.py", line 302, 
> in listTables}}
> {{iter = self._jcatalog.listTables(dbName).toLocalIterator()}}
> {{File 
> ".venv/lib/python3.10/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
>  line 1322, in _{_}call{_}_}}
> {{File 
> ".venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", 
> line 175, in deco}}
> {{raise converted from None}}
> {{pyspark.errors.exceptions.captured.ParseException:}}
> {{[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)}}
> == SQL ==
> ^^^
> >>>
> The same code worked correctly in Spark version 3.3.1.
> No changes were made to the code aside from upgrading Spark.
> Thank you for considering this issue! Any assistance in resolving it would be 
> greatly appreciated.
> Best regards,
> Andrej



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45417) Make InheritableThread inherit the active session

2023-10-04 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-45417:
-
Description: 
Repro:

{code:java}
# repro.py
from multiprocessing.pool import ThreadPool
from pyspark import inheritable_thread_target
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

def f(i, spark):
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")
    print(f"{i} local property foo = 
{spark.sparkContext.getLocalProperty('foo')}")
    spark = SparkSession.builder.appName("Test").getOrCreate()
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")

pool = ThreadPool(4)
spark.sparkContext.setLocalProperty("foo", "bar")
pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code}

Run as: {{./bin/spark-submit repro.py}}

{{getOrCreate()}} doesn't set the active session either. The only way is 
calling the Java function directly: 
{{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.

 

  was:
Repro:

{code:java}
from multiprocessing.pool import ThreadPool
from pyspark import inheritable_thread_target
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

def f(i, spark):
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")
    print(f"{i} local property foo = 
{spark.sparkContext.getLocalProperty('foo')}")
    spark = SparkSession.builder.appName("Test").getOrCreate()
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")

pool = ThreadPool(4)
spark.sparkContext.setLocalProperty("foo", "bar")
pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code}

{{getOrCreate()}} doesn't set the active session either. The only way is 
calling the Java function directly: 
{{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.

 


> Make InheritableThread inherit the active session
> -
>
> Key: SPARK-45417
> URL: https://issues.apache.org/jira/browse/SPARK-45417
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Chungmin Lee
>Priority: Major
>
> Repro:
> {code:java}
> # repro.py
> from multiprocessing.pool import ThreadPool
> from pyspark import inheritable_thread_target
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.appName("Test").getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> def f(i, spark):
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
>     print(f"{i} local property foo = 
> {spark.sparkContext.getLocalProperty('foo')}")
>     spark = SparkSession.builder.appName("Test").getOrCreate()
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
> pool = ThreadPool(4)
> spark.sparkContext.setLocalProperty("foo", "bar")
> pool.starmap(inheritable_thread_target(f), [(i, spark) for i in 
> range(4)]){code}
> Run as: {{./bin/spark-submit repro.py}}
> {{getOrCreate()}} doesn't set the active session either. The only way is 
> calling the Java function directly: 
> {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45417) Make InheritableThread inherit the active session

2023-10-04 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-45417:
-
Summary: Make InheritableThread inherit the active session  (was: Make 
InheritableThread inherit active session)

> Make InheritableThread inherit the active session
> -
>
> Key: SPARK-45417
> URL: https://issues.apache.org/jira/browse/SPARK-45417
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Chungmin Lee
>Priority: Major
>
> Repro:
> {code:java}
> from multiprocessing.pool import ThreadPool
> from pyspark import inheritable_thread_target
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.appName("Test").getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> def f(i, spark):
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
>     print(f"{i} local property foo = 
> {spark.sparkContext.getLocalProperty('foo')}")
>     spark = SparkSession.builder.appName("Test").getOrCreate()
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
> pool = ThreadPool(4)
> spark.sparkContext.setLocalProperty("foo", "bar")
> pool.starmap(inheritable_thread_target(f), [(i, spark) for i in 
> range(4)]){code}
> {{getOrCreate()}} doesn't set the active session either. The only way is 
> calling the Java function directly: 
> {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45417) Make InheritableThread inherit active session

2023-10-04 Thread Chungmin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-45417:
-
Summary: Make InheritableThread inherit active session  (was: 
InheritableThread doesn't inherit active session)

> Make InheritableThread inherit active session
> -
>
> Key: SPARK-45417
> URL: https://issues.apache.org/jira/browse/SPARK-45417
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Chungmin Lee
>Priority: Major
>
> Repro:
> {code:java}
> from multiprocessing.pool import ThreadPool
> from pyspark import inheritable_thread_target
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.appName("Test").getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> def f(i, spark):
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
>     print(f"{i} local property foo = 
> {spark.sparkContext.getLocalProperty('foo')}")
>     spark = SparkSession.builder.appName("Test").getOrCreate()
>     print(f"{i} spark = {spark}")
>     print(f"{i} active session = {SparkSession.getActiveSession()}")
> pool = ThreadPool(4)
> spark.sparkContext.setLocalProperty("foo", "bar")
> pool.starmap(inheritable_thread_target(f), [(i, spark) for i in 
> range(4)]){code}
> {{getOrCreate()}} doesn't set the active session either. The only way is 
> calling the Java function directly: 
> {{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45417) InheritableThread doesn't inherit active session

2023-10-04 Thread Chungmin Lee (Jira)
Chungmin Lee created SPARK-45417:


 Summary: InheritableThread doesn't inherit active session
 Key: SPARK-45417
 URL: https://issues.apache.org/jira/browse/SPARK-45417
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Chungmin Lee


Repro:

{code:java}
from multiprocessing.pool import ThreadPool
from pyspark import inheritable_thread_target
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

def f(i, spark):
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")
    print(f"{i} local property foo = 
{spark.sparkContext.getLocalProperty('foo')}")
    spark = SparkSession.builder.appName("Test").getOrCreate()
    print(f"{i} spark = {spark}")
    print(f"{i} active session = {SparkSession.getActiveSession()}")

pool = ThreadPool(4)
spark.sparkContext.setLocalProperty("foo", "bar")
pool.starmap(inheritable_thread_target(f), [(i, spark) for i in range(4)]){code}

{{getOrCreate()}} doesn't set the active session either. The only way is 
calling the Java function directly: 
{{spark._jsparkSession.setActiveSession(spark._jsparkSession)}}.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org