[ https://issues.apache.org/jira/browse/SPARK-19645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
guifeng updated SPARK-19645: ---------------------------- Description: We are trying to use Structured Streaming in product, however currently there exists a bug refer to the process of streaming job restart. The following is the concrete error message: {quote} Caused by: java.lang.IllegalStateException: Error committing version 2 into HDFSStateStore[id = (op=0, part=136), dir = /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to rename /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156) ... 14 more {quote} The bug can be easily reproduce when restart previous streaming job, and the main reason is that when restart streaming job spark will recompute WAL offsets and generate the same hdfs delta file whose name is "currentBatchId.delta". In my opinion, this is a bug. If you guy consider that this is a bug also, I can fix it. was: We are trying to use Structured Streaming in product, however currently there exists a bug refer to the process of streaming job restart. The following is the concrete error message: {quote} Caused by: java.lang.IllegalStateException: Error committing version 2 into HDFSStateStore[id = (op=0, part=136), dir = /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to rename /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156) ... 14 more {quote} The bug can be easily reproduce when restart previous streaming job, and the main reason is that when restart streaming job spark will recompute WAL offsets and generate the same hdfs delta file whose name is "currentBatchId.delta". In my opinion, this is a bug, and if you guy consider that this is a bug also, I can fix it. > structured streaming job restart > -------------------------------- > > Key: SPARK-19645 > URL: https://issues.apache.org/jira/browse/SPARK-19645 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Reporter: guifeng > Priority: Critical > > We are trying to use Structured Streaming in product, however currently > there exists a bug refer to the process of streaming job restart. > The following is the concrete error message: > {quote} > Caused by: java.lang.IllegalStateException: Error committing version 2 > into HDFSStateStore[id = (op=0, part=136), dir = > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136] > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Failed to rename > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 > to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156) > ... 14 more > {quote} > The bug can be easily reproduce when restart previous streaming job, and > the main reason is that when restart streaming job spark will recompute WAL > offsets and generate the same hdfs delta file whose name is > "currentBatchId.delta". In my opinion, this is a bug. If you guy consider > that this is a bug also, I can fix it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org