tsreaper opened a new pull request, #934: URL: https://github.com/apache/incubator-paimon/pull/934
### Purpose This PR fixes #657. We now explain this bug by analyzing the following log. ``` org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received split request from parallel task 0 org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) (85791e7e8233fe32e987a2ed56087f0b) switched from INITIALIZING to RUNNING. org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [org.apache.paimon.flink.source.FileStoreSourceSplit@962adbbe, org.apache.paimon.flink.source.FileStoreSourceSplit@15f08fe5, org.apache.paimon.flink.source.FileStoreSourceSplit@267c7375, org.apache.paimon.flink.source.FileStoreSourceSplit@2005f992] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 org.apache.paimon.shaded.org.apache.orc.impl.OrcCodecPool [] - Got brand-new codec LZ4 org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl [] - Reading ORC rows from /test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-13/bucket-0/data-6d9b5aa5-7346-48b5-8a5d-6ede1c207fe2-0.orc with {include: [false, true, true, true, true, true, true], offset: 3, length: 321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], schema: struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>, includeAcidColumns: true} org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000001] org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl [] - Reading ORC rows from /test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-14/bucket-0/data-fa58978b-1cbd-47dc-a83a-955c415be35f-0.orc with {include: [false, true, true, true, true, true, true], offset: 3, length: 321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], schema: struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>, includeAcidColumns: true} org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000002] org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received split request from parallel task 0 org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl [] - Reading ORC rows from /test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-16/bucket-0/data-218bb51a-4074-4385-b339-6fb8972839fb-0.orc with {include: [false, true, true, true, true, true, true], offset: 3, length: 321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], schema: struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>, includeAcidColumns: true} org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000002] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000003] org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl [] - Reading ORC rows from /test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-15/bucket-0/data-9d0062bc-f017-439c-bb2e-35a75ea98564-0.orc with {include: [false, true, true, true, true, true, true], offset: 3, length: 321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], schema: struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>, includeAcidColumns: true} org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000003] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000004] org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000004] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle. org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0 org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited. org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event. org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received split request from parallel task 0 org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader. org.apache.flink.runtime.taskmanager.Task [] - Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 (85791e7e8233fe32e987a2ed56087f0b) switched from RUNNING to FINISHED. org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 (85791e7e8233fe32e987a2ed56087f0b). org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 85791e7e8233fe32e987a2ed56087f0b. org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received split request from parallel task 0 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received split request from parallel task 0 org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) (85791e7e8233fe32e987a2ed56087f0b) switched from RUNNING to FAILED on 172.28.0.4:35897-f67396 @ 1p888cgybk6o_taskmanager_1.e89d7c14-3101-4e48-b467-ff30624fecb3 (dataPort=33469). org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) - execution #0 ``` We can see that the reader send split request 3 times. The first response from the coordinator contains 4 splits and the second response indicate that there are no more splits. The reader sees that there are no more splits and exits, so the third response is lost, causing the `FlinkException` at the end. So the problem is: why does the reader send split request 3 times? This is because `FileStoreSourceReader#onSplitFinished` is called each time when a split is consumed. Current implementation always send a split request in `onSplitFinished`. As there are 4 splits in the first response, the reader may send 4 or more split requests while it actually only needs 2. We need to check if all splits are consumed before sending the next split request. ### Tests * FileStoreSourceReaderTest#testAddMultipleSplits ### API and Format N/A ### Documentation N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org