tsreaper opened a new pull request, #934:
URL: https://github.com/apache/incubator-paimon/pull/934

   ### Purpose
   
   This PR fixes #657.
   
   We now explain this bug by analyzing the following log.
   
   ```
   org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received 
split request from parallel task 0
   org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> 
Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) 
(85791e7e8233fe32e987a2ed56087f0b) switched from INITIALIZING to RUNNING.
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: 
[org.apache.paimon.flink.source.FileStoreSourceSplit@962adbbe, 
org.apache.paimon.flink.source.FileStoreSourceSplit@15f08fe5, 
org.apache.paimon.flink.source.FileStoreSourceSplit@267c7375, 
org.apache.paimon.flink.source.FileStoreSourceSplit@2005f992]
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
   org.apache.paimon.shaded.org.apache.orc.impl.OrcCodecPool    [] - Got 
brand-new codec LZ4
   org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl      [] - Reading 
ORC rows from 
/test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-13/bucket-0/data-6d9b5aa5-7346-48b5-8a5d-6ede1c207fe2-0.orc
 with {include: [false, true, true, true, true, true, true], offset: 3, length: 
321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], 
schema: 
struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>,
 includeAcidColumns: true}
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000000001]
   org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl      [] - Reading 
ORC rows from 
/test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-14/bucket-0/data-fa58978b-1cbd-47dc-a83a-955c415be35f-0.orc
 with {include: [false, true, true, true, true, true, true], offset: 3, length: 
321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], 
schema: 
struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>,
 includeAcidColumns: true}
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000000001]
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000000002]
   org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received 
split request from parallel task 0
   org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl      [] - Reading 
ORC rows from 
/test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-16/bucket-0/data-218bb51a-4074-4385-b339-6fb8972839fb-0.orc
 with {include: [false, true, true, true, true, true, true], offset: 3, length: 
321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], 
schema: 
struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>,
 includeAcidColumns: true}
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000000002]
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000000003]
   org.apache.paimon.shaded.org.apache.orc.impl.ReaderImpl      [] - Reading 
ORC rows from 
/test-data/d45713ae-749b-40a7-a7bd-1a7f3416d804.store/default.db/ts_table/dt=2023-01-15/bucket-0/data-9d0062bc-f017-439c-bb2e-35a75ea98564-0.orc
 with {include: [false, true, true, true, true, true, true], offset: 3, length: 
321, sarg: leaf-0 = (LESS_THAN dt 2023-01-17), expr = leaf-0, columns: [], 
schema: 
struct<_KEY_k:int,_SEQUENCE_NUMBER:bigint,_VALUE_KIND:tinyint,dt:string,k:int,v:int>,
 includeAcidColumns: true}
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000000003]
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000000004]
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000000004]
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
- Closing splitFetcher 0 because it is idle.
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Shutting down split fetcher 0
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Split fetcher 0 exited.
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
received NoMoreSplits event.
   org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received 
split request from parallel task 0
   org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
   org.apache.flink.runtime.taskmanager.Task                    [] - Source: 
paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> 
Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 
(85791e7e8233fe32e987a2ed56087f0b) switched from RUNNING to FINISHED.
   org.apache.flink.runtime.taskmanager.Task                    [] - Freeing 
task resources for Source: 
paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> 
Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 
(85791e7e8233fe32e987a2ed56087f0b).
   org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table 
-> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1)#0 
85791e7e8233fe32e987a2ed56087f0b.
   org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received 
split request from parallel task 0
   org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table received 
split request from parallel task 0
   org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> 
Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) 
(85791e7e8233fe32e987a2ed56087f0b) switched from RUNNING to FAILED on 
172.28.0.4:35897-f67396 @ 
1p888cgybk6o_taskmanager_1.e89d7c14-3101-4e48-b467-ff30624fecb3 
(dataPort=33469).
   org.apache.flink.util.FlinkException: An OperatorEvent from an 
OperatorCoordinator to a task was lost. Triggering task failover to ensure 
consistency. Event: '[NoMoreSplitEvent]', targetTask: Source: 
paimon-adbedf45-0cf5-43df-b62a-1dae79a5dc62.default.ts_table -> 
Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> 
NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT 
NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) - 
execution #0
   ```
   
   We can see that the reader send split request 3 times. The first response 
from the coordinator contains 4 splits and the second response indicate that 
there are no more splits. The reader sees that there are no more splits and 
exits, so the third response is lost, causing the `FlinkException` at the end.
   
   So the problem is: why does the reader send split request 3 times? This is 
because `FileStoreSourceReader#onSplitFinished` is called each time when a 
split is consumed. Current implementation always send a split request in 
`onSplitFinished`. As there are 4 splits in the first response, the reader may 
send 4 or more split requests while it actually only needs 2.
   
   We need to check if all splits are consumed before sending the next split 
request.
   
   ### Tests
   
   * FileStoreSourceReaderTest#testAddMultipleSplits
   
   ### API and Format 
   
   N/A
   
   ### Documentation
   
   N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to