[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Genmao Yu updated SPARK-29438: ------------------------------ Description: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName / {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3) | (streamDf1) (streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | Sort Sort | | \ / \ / \ / \ / SortMergeJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file. {code:java} LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \ / \ / \ / \ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId. was: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName / {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3) | (streamDf1) (streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | Sort Sort | | \ / \ / \ / \ / SortMergeJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file. {code:java} LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \ / \ / \ / \ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=false) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId. > Failed to get state store in stream-stream join > ----------------------------------------------- > > Key: SPARK-29438 > URL: https://issues.apache.org/jira/browse/SPARK-29438 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.4 > Reporter: Genmao Yu > Priority: Critical > > Now, Spark use the `TaskPartitionId` to determine the StateStore path. > {code:java} > TaskPartitionId \ > StateStoreVersion --> StoreProviderId -> StateStore > StateStoreName / > {code} > In spark stages, the task partition id is determined by the number of tasks. > As we said the StateStore file path depends on the task partition id. So if > stream-stream join task partition id is changed against last batch, it will > get wrong StateStore data or fail with non-exist StateStore data. In some > corner cases, it happened: > {code:java} > val df3 = streamDf1.join(streamDf2) > val df5 = streamDf3.join(batchDf4) > val df = df3.union(df5) > df.writeStream...start() > {code} > A simplified DAG like this: > {code:java} > DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan > (streamDf3) | (streamDf1) (streamDf2) > | | | | > Exchange(200) Exchange(200) Exchange(200) Exchange(200) > | | | | > Sort Sort | | > \ / \ / > \ / \ / > SortMergeJoin StreamingSymmetricHashJoin > \ / > \ / > \ / > Union > {code} > Stream-Steam join task Id will start from 200 to 399 as they are in the same > stage with `SortMergeJoin`. But when there is no new incoming data in > `streamDf3` in some batch, it will generate a empty LocalRelation, and then > the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, > Stream-Steam join task Id will start from 1 to 200. Finally, it will get > wrong StateStore path through TaskPartitionId, and failed with error reading > state store delta file. > {code:java} > LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan > | | | | > BroadcastExchange | Exchange(200) Exchange(200) > | | | | > | | | | > \ / \ / > \ / \ / > BroadcastHashJoin StreamingSymmetricHashJoin > \ / > \ / > \ / > Union > {code} > In my job, I closed the auto BroadcastJoin feature (set > spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should > make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org