[ 
https://issues.apache.org/jira/browse/FLINK-31143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692463#comment-17692463
 ] 

Caizhi Weng edited comment on FLINK-31143 at 2/23/23 3:14 AM:
--------------------------------------------------------------

Hi [~mapohl] [~Weijie Guo].

When implementing {{collect}} I only considered job restarts and didn't 
consider client restarts, so {{collect}} + savepoint is currently not supported.


was (Author: tsreaper):
Hi [~mapohl] [~Weijie Guo].

When implementing {{collect}} I only considered job restarts and didn't 
consider client restarts, so {{collect}} + savepoint is currently not 
supported. I can implement this but I think this is actually a new feature 
rather than a bug fix. As we're very close to releasing I think it might be 
more proper to introduce this feature in 1.17.1. What do you think?

> Invalid request: offset doesn't match when restarting from a savepoint
> ----------------------------------------------------------------------
>
>                 Key: FLINK-31143
>                 URL: https://issues.apache.org/jira/browse/FLINK-31143
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0, 1.15.3, 1.16.1
>            Reporter: Matthias Pohl
>            Priority: Critical
>
> I tried to run the following case:
> {code:java}
> public static void main(String[] args) throws Exception {
>         final String createTableQuery =
>                 "CREATE TABLE left_table (a int, c varchar) "
>                         + "WITH ("
>                         + "     'connector' = 'datagen', "
>                         + "     'rows-per-second' = '1', "
>                         + "     'fields.a.kind' = 'sequence', "
>                         + "     'fields.a.start' = '0', "
>                         + "     'fields.a.end' = '100000'"
>                         + ");";
>         final String selectQuery = "SELECT * FROM left_table;";
>         final Configuration initialConfig = new Configuration();
>         initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
>         final EnvironmentSettings initialSettings =
>                 EnvironmentSettings.newInstance()
>                         .inStreamingMode()
>                         .withConfiguration(initialConfig)
>                         .build();
>         final TableEnvironment initialTableEnv = 
> TableEnvironment.create(initialSettings);
>         // create job and consume two results
>         initialTableEnv.executeSql(createTableQuery);
>         final TableResult tableResult = 
> initialTableEnv.sqlQuery(selectQuery).execute();
>         tableResult.await();
>         System.out.println(tableResultIterator.next()); 
>         System.out.println(tableResultIterator.next());          
>         // stop job with savepoint
>         final String savepointPath;
>         try (CloseableIterator<Row> tableResultIterator = 
> tableResult.collect()) {
>             final JobClient jobClient =
>                     
> tableResult.getJobClient().orElseThrow(IllegalStateException::new);
>             final File savepointDirectory = Files.createTempDir();
>             savepointPath =
>                     jobClient
>                             .stopWithSavepoint(
>                                     true,
>                                     savepointDirectory.getAbsolutePath(),
>                                     SavepointFormatType.CANONICAL)
>                             .get();
>         }
>         // restart the very same job from the savepoint
>         final SavepointRestoreSettings savepointRestoreSettings =
>                 SavepointRestoreSettings.forPath(savepointPath, true);
>         final Configuration restartConfig = new Configuration(initialConfig);
>         SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
> restartConfig);
>         final EnvironmentSettings restartSettings =
>                 EnvironmentSettings.newInstance()
>                         .inStreamingMode()
>                         .withConfiguration(restartConfig)
>                         .build();
>         final TableEnvironment restartTableEnv = 
> TableEnvironment.create(restartSettings);
>         restartTableEnv.executeSql(createTableQuery);
>         restartTableEnv.sqlQuery(selectQuery).execute().print();
>     }
> {code}
> h3. Expected behavior
> The job continues omitting the inital two records and starts printing results 
> from 2 onwards.
> h3. Observed behavior
> No results are printed. The logs show that an invalid request was handled:
> {code:java}
> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - 
> Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, 
> offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, 
> offset = 1
> {code}
> It looks like the right offset is not picked up from the savepoint (see 
> [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to