[ https://issues.apache.org/jira/browse/SPARK-26411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776565#comment-16776565 ]
Jungtaek Lim commented on SPARK-26411: -------------------------------------- I tend to agree this concern: the metadata of file stream sink is rather leveraged from other queries which leverage the output as file source, which is I think "optional" for the view of current query. We have some guidances to remove checkpoint data and rerun the query, but we haven't mentioned they may need to also remove metadata in case of file stream sink. I think this is related a bit with SPARK-24295, because reporter suffered metadata going huge too fast, and wanted to have a way to purge, but given multiple queries can access the output there's no way to purge it. Only workaround to safely avoid the metadata size issue is just not writing it if not necessary. File stream sink itself only leverages the last succeed batch ID, which might be able to be checkpointed altogether in query checkpoint data. > Streaming: _spark_metadata and checkpoints out of sync cause checkpoint > packing failure > --------------------------------------------------------------------------------------- > > Key: SPARK-26411 > URL: https://issues.apache.org/jira/browse/SPARK-26411 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Reporter: Alexander Panzhin > Priority: Major > > Spark Structured Streaming File source to File sink seems to be picking up > information from `_spark_metadata` directory for checkpoint data compaction > Worst part is that output and checkpoint being out of sync, data is not being > written. > *This is not documented anywhere. Removing checkpoint data and leaving > _spark_metadata in the output directory WILL CAUSE data loss.* > > FileSourceScanExec.createNonBucketedReadRDD kicks off compaction and fails > the whole job, because it expects deltas to be present. > But the delta files are never written because FileStreamSink.addBatch doesn't > execute the Dataframe that it receives. > {code:java} > ... > INFO [2018-12-17 03:20:02,784] > org.apache.spark.sql.execution.streaming.FileStreamSink: Skipping already > committed batch 75 > ... > INFO [2018-12-17 03:30:01,691] > org.apache.spark.sql.execution.streaming.FileStreamSource: Log offset set to > 76 with 29 new files INFO [2018-12-17 03:30:01,700] > org.apache.spark.sql.execution.streaming.MicroBatchExecution: Committed > offsets for batch 76. Metadata > OffsetSeqMetadata(0,1545017401691,Map(spark.sql.shuffle.partitions -> 200, > spark.sql.streaming.stateStore.providerClass -> > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) > INFO [2018-12-17 03:30:01,704] > org.apache.spark.sql.execution.streaming.FileStreamSource: Processing 29 > files from 76:76 INFO [2018-12-17 03:30:01,983] > org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pruning > directories with: INFO [2018-12-17 03:30:01,983] > org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan > Filters: INFO [2018-12-17 03:30:01,984] > org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data > Schema: struct<value: string> INFO [2018-12-17 03:30:01,984] > org.apache.spark.sql.execution.FileSourceScanExec: Pushed Filters: INFO > [2018-12-17 03:30:02,581] > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code > generated in 16.205011 ms INFO [2018-12-17 03:30:02,593] > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code > generated in 9.368244 ms INFO [2018-12-17 03:30:02,629] > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code > generated in 31.126375 ms INFO [2018-12-17 03:30:02,640] > org.apache.spark.SparkContext: Created broadcast 86 from start at > SourceStream.scala:55 INFO [2018-12-17 03:30:02,643] > org.apache.spark.sql.execution.FileSourceScanExec: Planning scan with bin > packing, max size: 14172786 bytes, open cost is considered as scanning > 4194304 bytes. INFO [2018-12-17 03:30:02,700] > org.apache.spark.ContextCleaner: Cleaned accumulator 4321 INFO [2018-12-17 > 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4326 INFO > [2018-12-17 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned > accumulator 4324 INFO [2018-12-17 03:30:02,700] > org.apache.spark.ContextCleaner: Cleaned accumulator 4320 INFO [2018-12-17 > 03:30:02,700] org.apache.spark.ContextCleaner: Cleaned accumulator 4325 INFO > [2018-12-17 03:30:02,737] org.apache.spark.SparkContext: Created broadcast 87 > from start at SourceStream.scala:55 INFO [2018-12-17 03:30:02,756] > org.apache.spark.SparkContext: Starting job: start at SourceStream.scala:55 > INFO [2018-12-17 03:30:02,761] org.apache.spark.SparkContext: Created > broadcast 88 from broadcast at DAGScheduler.scala:1079 INFO [2018-12-17 > 03:30:03,860] org.apache.spark.ExecutorAllocationManager: Requesting 3 new > executors because tasks are backlogged (new desired total will be 3) INFO > [2018-12-17 03:30:04,863] org.apache.spark.ExecutorAllocationManager: > Requesting 1 new executor because tasks are backlogged (new desired total > will be 4) INFO [2018-12-17 03:30:06,545] org.apache.spark.SparkContext: > Created broadcast 89 from broadcast at DAGScheduler.scala:1079 WARN > [2018-12-17 03:30:07,214] org.apache.spark.scheduler.TaskSetManager: Lost > task 19.0 in stage 87.0 (TID 6145, ip-10-172-18-94.ec2.internal, executor 1): > java.lang.IllegalStateException: Error reading delta file > hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta > of HDFSStateStoreProvider[id = (op=0,part=19),dir = > hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19]: > > hdfs://ip-10-172-19-174.ec2.internal:8020/user/hadoop/clickstream/checkpoint/state/0/19/1.delta > does not exist at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) > at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org