[jira] [Commented] (FLINK-34508) Migrate S3-related ITCases and e2e tests to Minio
[ https://issues.apache.org/jira/browse/FLINK-34508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838471#comment-17838471 ] Muhammet Orazov commented on FLINK-34508: - The [YarnFileStageTestS3ITCase|https://github.com/apache/flink/blob/master/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java#L131] test is skipped because of the assumption UUID.randomUUID() path does not exist. > Migrate S3-related ITCases and e2e tests to Minio > -- > > Key: FLINK-34508 > URL: https://issues.apache.org/jira/browse/FLINK-34508 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: github-actions, pull-request-available > > Anything that uses {{org.apache.flink.testutils.s3.S3TestCredentials}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop [flink-cdc]
yuxiqian commented on PR #3235: URL: https://github.com/apache/flink-cdc/pull/3235#issuecomment-2063041386 cc @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController
[ https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei reassigned FLINK-35027: -- Assignee: Yanfei Lei > Implement checkpoint drain in AsyncExecutionController > -- > > Key: FLINK-35027 > URL: https://issues.apache.org/jira/browse/FLINK-35027 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35026) Introduce async execution configurations
[ https://issues.apache.org/jira/browse/FLINK-35026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei reassigned FLINK-35026: -- Assignee: Yanfei Lei > Introduce async execution configurations > > > Key: FLINK-35026 > URL: https://issues.apache.org/jira/browse/FLINK-35026 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration, Runtime / Task >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]
flinkbot commented on PR #24676: URL: https://github.com/apache/flink/pull/24676#issuecomment-2062993344 ## CI report: * bc18939af1ffea146d6f2214d061cca4dc136ca6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController
[ https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35027: --- Labels: pull-request-available (was: ) > Implement checkpoint drain in AsyncExecutionController > -- > > Key: FLINK-35027 > URL: https://issues.apache.org/jira/browse/FLINK-35027 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Task >Reporter: Yanfei Lei >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]
fredia opened a new pull request, #24676: URL: https://github.com/apache/flink/pull/24676 ## What is the purpose of the change This PR implements checkpoint drain in `AsyncExecutionController`, and wires it to `AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2`. ## Brief change log - Add `AsyncExecutionController#drainInflightRecords` - Override `AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2#snapshotState()`. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. - `AsyncExecutionControllerTest#testInFlightRecordControl()` - ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes, checkpointing) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452 ] Xin Gong edited comment on FLINK-35151 at 4/18/24 4:46 AM: --- I get an idea to address this issue by setting "currentTaskRunning || queue.remainingCapacity() == 0" for BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL was (Author: JIRAUSER292212): I get an idea to address this issue by set "currentTaskRunning || queue.remainingCapacity() == 0" for BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > > You can set StatefulTaskContext#queue to be 1 and run UT > NewlyAddedTableITCase#testRemoveAndAddNewTable. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452 ] Xin Gong edited comment on FLINK-35151 at 4/18/24 4:44 AM: --- I get an idea to address this issue by set "currentTaskRunning || queue.remainingCapacity() == 0" for BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL was (Author: JIRAUSER292212): I get an idea to address this issue by set currentTaskRunning || queue.remainingCapacity() == 0 for BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > > You can set StatefulTaskContext#queue to be 1 and run UT > NewlyAddedTableITCase#testRemoveAndAddNewTable. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33460) Support property authentication connection.
[ https://issues.apache.org/jira/browse/FLINK-33460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33460: --- Labels: pull-request-available (was: ) > Support property authentication connection. > --- > > Key: FLINK-33460 > URL: https://issues.apache.org/jira/browse/FLINK-33460 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: RocMarshal >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]
RocMarshal opened a new pull request, #115: URL: https://github.com/apache/flink-connector-jdbc/pull/115 - Support property authentication connection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33460) Support property authentication connection.
[ https://issues.apache.org/jira/browse/FLINK-33460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RocMarshal updated FLINK-33460: --- Summary: Support property authentication connection. (was: Support more authentication connection types such as the secret.) > Support property authentication connection. > --- > > Key: FLINK-33460 > URL: https://issues.apache.org/jira/browse/FLINK-33460 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Reporter: RocMarshal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452 ] Xin Gong edited comment on FLINK-35151 at 4/18/24 4:43 AM: --- I get an idea to address this issue by set currentTaskRunning || queue.remainingCapacity() == 0 for BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL was (Author: JIRAUSER292212): I get an idea to address this issue by set currentTaskRunning || queue.remainingCapacity() == 0 for BinlogSplitReader#pollSplitRecords. > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > > You can set StatefulTaskContext#queue to be 1 and run UT > NewlyAddedTableITCase#testRemoveAndAddNewTable. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452 ] Xin Gong commented on FLINK-35151: -- I get an idea to address this issue by set currentTaskRunning || queue.remainingCapacity() == 0 for BinlogSplitReader#pollSplitRecords. > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > > You can set StatefulTaskContext#queue to be 1 and run UT > NewlyAddedTableITCase#testRemoveAndAddNewTable. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Gong updated FLINK-35151: - Description: Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full. Reason is that producing binlog is too fast. MySqlSplitReader#suspendBinlogReaderIfNeed will execute BinlogSplitReader#stopBinlogReadTask to set currentTaskRunning to be false after MysqSourceReader receives binlog split update event. MySqlSplitReader#pollSplitRecords is executed and dataIt is null to execute closeBinlogReader when currentReader is BinlogSplitReader. closeBinlogReader will execute statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. Because BinaryLogClient#connectLock is not release when MySqlStreamingChangeEventSource add element to full queue. You can set StatefulTaskContext#queue to be 1 and run UT NewlyAddedTableITCase#testRemoveAndAddNewTable. was: Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full. Reason is that producing binlog is too fast. MySqlSplitReader#suspendBinlogReaderIfNeed will execute BinlogSplitReader#stopBinlogReadTask to set currentTaskRunning to be false after MysqSourceReader receives binlog split update event. MySqlSplitReader#pollSplitRecords is executed and dataIt is null to execute closeBinlogReader when currentReader is BinlogSplitReader. closeBinlogReader will execute statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. Because BinaryLogClient#connectLock is not release when MySqlStreamingChangeEventSource add element to full queue. > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > > You can set StatefulTaskContext#queue to be 1 and run UT > NewlyAddedTableITCase#testRemoveAndAddNewTable. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Gong updated FLINK-35151: - Description: Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full. Reason is that producing binlog is too fast. MySqlSplitReader#suspendBinlogReaderIfNeed will execute BinlogSplitReader#stopBinlogReadTask to set currentTaskRunning to be false after MysqSourceReader receives binlog split update event. MySqlSplitReader#pollSplitRecords is executed and dataIt is null to execute closeBinlogReader when currentReader is BinlogSplitReader. closeBinlogReader will execute statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. Because BinaryLogClient#connectLock is not release when MySqlStreamingChangeEventSource add element to full queue. was: Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full. > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
Xin Gong created FLINK-35151: Summary: Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full Key: FLINK-35151 URL: https://issues.apache.org/jira/browse/FLINK-35151 Project: Flink Issue Type: Bug Components: Flink CDC Environment: I use master branch reproduce it. Reason is that producing binlog is too fast. MySqlSplitReader#suspendBinlogReaderIfNeed will execute BinlogSplitReader#stopBinlogReadTask to set currentTaskRunning to be false after MysqSourceReader receives binlog split update event. MySqlSplitReader#pollSplitRecords is executed and dataIt is null to execute closeBinlogReader when currentReader is BinlogSplitReader. closeBinlogReader will execute statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. Because BinaryLogClient#connectLock is not release when MySqlStreamingChangeEventSource add element to full queue. Reporter: Xin Gong Attachments: dumpstack.txt Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full
[ https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Gong updated FLINK-35151: - Environment: I use master branch reproduce it. (was: I use master branch reproduce it. Reason is that producing binlog is too fast. MySqlSplitReader#suspendBinlogReaderIfNeed will execute BinlogSplitReader#stopBinlogReadTask to set currentTaskRunning to be false after MysqSourceReader receives binlog split update event. MySqlSplitReader#pollSplitRecords is executed and dataIt is null to execute closeBinlogReader when currentReader is BinlogSplitReader. closeBinlogReader will execute statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. Because BinaryLogClient#connectLock is not release when MySqlStreamingChangeEventSource add element to full queue.) > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full > -- > > Key: FLINK-35151 > URL: https://issues.apache.org/jira/browse/FLINK-35151 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Environment: I use master branch reproduce it. >Reporter: Xin Gong >Priority: Major > Attachments: dumpstack.txt > > > Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is > full. > Reason is that producing binlog is too fast. > MySqlSplitReader#suspendBinlogReaderIfNeed will execute > BinlogSplitReader#stopBinlogReadTask to set > currentTaskRunning to be false after MysqSourceReader receives binlog split > update event. > MySqlSplitReader#pollSplitRecords is executed and > dataIt is null to execute closeBinlogReader when currentReader is > BinlogSplitReader. closeBinlogReader will execute > statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. > Because BinaryLogClient#connectLock is not release when > MySqlStreamingChangeEventSource add element to full queue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]
Zakelly commented on code in PR #24672: URL: https://github.com/apache/flink/pull/24672#discussion_r1569904139 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java: ## @@ -32,6 +32,8 @@ */ public class KeyAccountingUnit { +public static final Object EMPTY_RECORD = "EMPTY_RECORD"; Review Comment: IMO, this should be under `RecordContext` instead of `KeyAccountingUnit`. And how about using `aec.buildContext(null, key)` everywhere instead of import the EMPTY_RECORD everywhere? Further more, it might be better the `EMPTY_RECORD` is `new Object()` to avoid coincidence that happens to be the same as the user input. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.function.BiConsumerWithException; + +import static org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit.EMPTY_RECORD; + +/** + * An implementation of {@link InternalTimerService} that is used by {@link + * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. + * The timer service will set {@link RecordContext} for the timers before invoking action to + * preserve the execution order between timer firing and records processing. + * + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425 + * timers section. + * @param Type of timer's key. + * @param Type of the namespace to which timers are scoped. + */ +public class InternalTimerServiceAsyncImpl extends InternalTimerServiceImpl { + +private AsyncExecutionController asyncExecutionController; + +InternalTimerServiceAsyncImpl( +TaskIOMetricGroup taskIOMetricGroup, +KeyGroupRange localKeyGroupRange, +KeyContext keyContext, +ProcessingTimeService processingTimeService, +KeyGroupedInternalPriorityQueue processingTimeTimersQueue, +KeyGroupedInternalPriorityQueue eventTimeTimersQueue, Review Comment: How about adding `>` as type parameter here? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -133,6 +140,48 @@ public final ThrowingConsumer, Exception> getRecordProcessor getClass().getName(), inputId)); } +/** + * Returns a {@link InternalTimerService} that can be used to query current processing time and + * event time and to set timers. An operator can have several timer services, where each has its + * own namespace serializer. Timer services are differentiated by the string key that is given + * when requesting them, if you call this method with the same key multiple times you will get + * the same timer service instance in subsequent requests. + * + * Timers are always scoped to a key, the currently active key of a keyed stream operation. + * When a timer fires, this key will also be set as the currently active key. + * + * Each timer has attached metadata, the namespace. Different timer services can have a + * different namespace type. If you don't need namespace differentiation you can use {@link + * org.apache.flink.runtime.state.VoidNamespaceSerializer} as the namespace serializer. + * + * @param name The name of the requested timer service. If no service
Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite… [flink-cdc]
yuxiqian closed pull request #3235: [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite… URL: https://github.com/apache/flink-cdc/pull/3235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite… [flink-cdc]
yuxiqian opened a new pull request, #3235: URL: https://github.com/apache/flink-cdc/pull/3235 … loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]
yunfengzhou-hub commented on code in PR #24672: URL: https://github.com/apache/flink/pull/24672#discussion_r1569902836 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ## @@ -117,6 +120,7 @@ public class InternalTimerServiceImpl implements InternalTimerService { startIdx = Math.min(keyGroupIdx, startIdx); } this.localKeyGroupRangeStartIdx = startIdx; +this.processingTimeCallback = this::onProcessingTime; Review Comment: How about making `onProcessingTime` a protected method? This way we won't need to introduce processingTimeCallback. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java: ## @@ -179,11 +180,60 @@ InternalTimerServiceImpl registerOrGetTimerService( return timerService; } +@Override +public InternalTimerService getAsyncInternalTimerService( +String name, +TypeSerializer keySerializer, +TypeSerializer namespaceSerializer, +Triggerable triggerable, +AsyncExecutionController asyncExecutionController) { +checkNotNull(keySerializer, "Timers can only be used on keyed operators."); + +// the following casting is to overcome type restrictions. +TimerSerializer timerSerializer = +new TimerSerializer<>(keySerializer, namespaceSerializer); + +InternalTimerServiceAsyncImpl timerService = +registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController); + +timerService.startTimerService( +timerSerializer.getKeySerializer(), +timerSerializer.getNamespaceSerializer(), +triggerable); + +return timerService; +} + + InternalTimerServiceAsyncImpl registerOrGetAsyncTimerService( +String name, +TimerSerializer timerSerializer, +AsyncExecutionController asyncExecutionController) { +InternalTimerServiceAsyncImpl timerService = +(InternalTimerServiceAsyncImpl) timerServices.get(name); +if (timerService == null) { + +timerService = +new InternalTimerServiceAsyncImpl<>( +taskIOMetricGroup, +localKeyGroupRange, +keyContext, +processingTimeService, +createTimerPriorityQueue( +PROCESSING_TIMER_PREFIX + name, timerSerializer), +createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), +cancellationContext, +asyncExecutionController); + +timerServices.put(name, timerService); +} +return timerService; +} + Map> getRegisteredTimerServices() { return Collections.unmodifiableMap(timerServices); } -private +protected KeyGroupedInternalPriorityQueue> createTimerPriorityQueue( Review Comment: This method seems not used in subclasses, so `private` might be enough. Same for `restoreStateForKeyGroup`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]
Zakelly commented on code in PR #24632: URL: https://github.com/apache/flink/pull/24632#discussion_r1569850955 ## flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java: ## @@ -140,4 +144,43 @@ public void skipFully(long bytes) throws IOException { bytes -= fsDataInputStream.skip(bytes); } } + +@Override +public int read(ByteBuffer byteBuffer) throws IOException { +// Not all internal stream supports ByteBufferReadable +if (fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) { +return fsDataInputStream.read(byteBuffer); +} else { +// Fallback to read byte then put +int c = read(); +if (c == -1) { +return -1; +} +byteBuffer.put((byte) c); + +int n = 1, len = byteBuffer.remaining() + 1; +for (; n < len; n++) { +c = read(); +if (c == -1) { +break; +} +byteBuffer.put((byte) c); +} +return n; +} +} + +@Override +public int read(long position, ByteBuffer byteBuffer) throws IOException { +// Not all internal stream supports ByteBufferPositionedReadable +if (fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) { +return fsDataInputStream.read(position, byteBuffer); +} else { +// Fallback to positionable read bytes then put +byte[] tmp = new byte[byteBuffer.remaining()]; Review Comment: Even for this fallback code path, there still be a possible way to optimize a little bit. e.g.: ``` if (byteBuffer.hasArray()) { fsDataInputStream.readFully(position, byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.remaining()); } ``` ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java: ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.PositionedReadable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link + * ByteBuffer}. + * + * All methods in this class maybe used by ForSt, please start a discussion firstly if it has to + * be modified. + */ +public class ByteBufferReadableFSDataInputStream extends FSDataInputStream { + +private final FSDataInputStream originalInputStream; + +/** + * InputStream Pool which provides multiple input streams to random read concurrently. An input + * stream should only be used by a thread at a point in time. + */ +private final Queue readInputStreamPool; + +private final Callable inputStreamBuilder; + +public ByteBufferReadableFSDataInputStream( +FSDataInputStream originalInputStream, +Callable inputStreamBuilder, +int inputStreamCapacity) { +this.originalInputStream = originalInputStream; +this.inputStreamBuilder = inputStreamBuilder; +this.readInputStreamPool = new LinkedBlockingQueue<>(inputStreamCapacity); +} + +/** + * Reads up to ByteBuffer#remaining bytes of data from the input stream into a + * ByteBuffer. Not Thread-safe yet since the interface of sequential read of ForSt only be + * accessed by one thread at a time. + * + * @param bb the buffer into which the data is read. + * @return the total number of bytes read into the buffer. + * @exception IOException If the first byte cannot be read for any reason other than end of + * file, or if the input stream has been closed, or if some other I/O error occurs. + * @exception NullPointerException If bb is null. + */ +public int readFully(ByteBuffer bb) throws
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Attachment: (was: image-2024-04-18-11-20-43-126.png) > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > BulkPartWriter: > !image-2024-04-18-11-03-08-998.png! > CsvBulkWriter: > !image-2024-04-18-11-20-43-126.png! > S3RecoverableFsDataOutputStream: > !image-2024-04-18-10-51-05-071.png! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Description: Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. BulkPartWriter: !image-2024-04-18-11-03-08-998.png! CsvBulkWriter: !image-2024-04-18-11-20-25-583.png! S3RecoverableFsDataOutputStream: !image-2024-04-18-10-51-05-071.png! was: Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. BulkPartWriter: !image-2024-04-18-11-03-08-998.png! CsvBulkWriter: !image-2024-04-18-11-20-43-126.png! S3RecoverableFsDataOutputStream: !image-2024-04-18-10-51-05-071.png! > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > BulkPartWriter: > !image-2024-04-18-11-03-08-998.png! > CsvBulkWriter: > !image-2024-04-18-11-20-25-583.png! > S3RecoverableFsDataOutputStream: > !image-2024-04-18-10-51-05-071.png! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Description: Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. BulkPartWriter: !image-2024-04-18-11-03-08-998.png! CsvBulkWriter: !image-2024-04-18-11-20-43-126.png! S3RecoverableFsDataOutputStream: !image-2024-04-18-10-51-05-071.png! was: Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. !image-2024-04-18-11-07-15-555.png! !image-2024-04-18-10-51-05-071.png! > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > BulkPartWriter: > !image-2024-04-18-11-03-08-998.png! > CsvBulkWriter: > !image-2024-04-18-11-20-43-126.png! > S3RecoverableFsDataOutputStream: > !image-2024-04-18-10-51-05-071.png! > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Attachment: image-2024-04-18-11-20-43-126.png > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png, > image-2024-04-18-11-20-43-126.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > !image-2024-04-18-11-07-15-555.png! > !image-2024-04-18-10-51-05-071.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Attachment: image-2024-04-18-11-20-25-583.png > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png, > image-2024-04-18-11-20-43-126.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > !image-2024-04-18-11-07-15-555.png! > !image-2024-04-18-10-51-05-071.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Attachment: (was: image-2024-04-18-11-07-15-555.png) > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > !image-2024-04-18-11-07-15-555.png! > !image-2024-04-18-10-51-05-071.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
[ https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qyw updated FLINK-35150: Description: Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. !image-2024-04-18-11-07-15-555.png! !image-2024-04-18-10-51-05-071.png! was: Flink S3 hadoop, write S3 in csv mode, I used this patch [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513] . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. > The specified upload does not exist. The upload ID may be invalid > - > > Key: FLINK-35150 > URL: https://issues.apache.org/jira/browse/FLINK-35150 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.15.0 >Reporter: qyw >Priority: Major > Attachments: image-2024-04-18-10-51-05-071.png, > image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-07-15-555.png > > > Flink S3 hadoop, write S3 in csv mode, I used this patch FLINK-28513 . But > I don't understand why S3RecoverableFsDataOutputStream "sync" method of this > class to be "completeMultipartUpload" operation, if "completeMultipartUpload" > here, Calling close later to upload the rest of the stream will inevitably > result in an error. The part corresponding to uploadID has been merged. > Therefore, when the message in csv is larger than > "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching > files, then when BulkPartWriter performs closeForCommit, Due to the sync > S3RecoverableFsDataOutputStream method call completeMultipartUpload, So > S3RecoverableFsDataOutputStream "closeForCommit" method due to the > uploadPart, at this time will lead to errors. > > !image-2024-04-18-11-07-15-555.png! > !image-2024-04-18-10-51-05-071.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid
qyw created FLINK-35150: --- Summary: The specified upload does not exist. The upload ID may be invalid Key: FLINK-35150 URL: https://issues.apache.org/jira/browse/FLINK-35150 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.15.0 Reporter: qyw Attachments: image-2024-04-18-10-51-05-071.png, image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-07-15-555.png Flink S3 hadoop, write S3 in csv mode, I used this patch [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513] . But I don't understand why S3RecoverableFsDataOutputStream "sync" method of this class to be "completeMultipartUpload" operation, if "completeMultipartUpload" here, Calling close later to upload the rest of the stream will inevitably result in an error. The part corresponding to uploadID has been merged. Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching files, then when BulkPartWriter performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method call completeMultipartUpload, So S3RecoverableFsDataOutputStream "closeForCommit" method due to the uploadPart, at this time will lead to errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_TIMEOUT = +ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Good point, changed it to `execution.async-mode.state-buffer-timeout`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569863724 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) { configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression); } +// +// Asynchronous execution configurations +// + +@Internal Review Comment: Ah, I misread the comment, and changed them from `@Experimental` to `@Internal`, I have corrected them now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]
morazow commented on code in PR #3230: URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569863282 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java: ## @@ -163,10 +163,18 @@ public String toString() { // --- public static StreamSplit appendFinishedSplitInfos( StreamSplit streamSplit, List splitInfos) { +// re-calculate the starting changelog offset after the new table added +Offset startingOffset = streamSplit.getStartingOffset(); +for (FinishedSnapshotSplitInfo splitInfo : splitInfos) { +if (splitInfo.getHighWatermark().isBefore(startingOffset)) { +startingOffset = splitInfo.getHighWatermark(); +} +} Review Comment: Got it, it will be always the min value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]
morazow commented on code in PR #3230: URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569850914 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java: ## @@ -163,10 +163,18 @@ public String toString() { // --- public static StreamSplit appendFinishedSplitInfos( StreamSplit streamSplit, List splitInfos) { +// re-calculate the starting changelog offset after the new table added +Offset startingOffset = streamSplit.getStartingOffset(); +for (FinishedSnapshotSplitInfo splitInfo : splitInfos) { +if (splitInfo.getHighWatermark().isBefore(startingOffset)) { +startingOffset = splitInfo.getHighWatermark(); +} +} Review Comment: Do we have to distinguish the high watermarks before the startingOffset? For example, if there are multiple high watermarks before startingOffset, which one should we take? Should it be the latest of those? Or is taking any highWatermark if it is before the startingOffset is allright? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2062885502 @LadyForest Hi, Could you please take a review? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
[ https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35149: --- Labels: pull-request-available (was: ) > Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not > TwoPhaseCommittingSink > --- > > Key: FLINK-35149 > URL: https://issues.apache.org/jira/browse/FLINK-35149 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Minor > Labels: pull-request-available > Fix For: 3.1.0 > > > Current , when sink is not instanceof TwoPhaseCommittingSink, use > input.transform rather than stream. It means that pre-write topology will be > ignored. > {code:java} > private void sinkTo( > DataStream input, > Sink sink, > String sinkName, > OperatorID schemaOperatorID) { > DataStream stream = input; > // Pre write topology > if (sink instanceof WithPreWriteTopology) { > stream = ((WithPreWriteTopology) > sink).addPreWriteTopology(stream); > } > if (sink instanceof TwoPhaseCommittingSink) { > addCommittingTopology(sink, stream, sinkName, schemaOperatorID); > } else { > input.transform( > SINK_WRITER_PREFIX + sinkName, > CommittableMessageTypeInfo.noOutput(), > new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2062883633 @PatrickRen , CC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
loserwang1024 opened a new pull request, #3233: URL: https://github.com/apache/flink-cdc/pull/3233 Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored. ``` private void sinkTo( DataStream input, Sink sink, String sinkName, OperatorID schemaOperatorID) { DataStream stream = input; // Pre write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology) sink).addPreWriteTopology(stream); } if (sink instanceof TwoPhaseCommittingSink) { addCommittingTopology(sink, stream, sinkName, schemaOperatorID); } else { input.transform( SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); } } ``` (ps: the modify of StarRocksUtils just apply spotless) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan resolved FLINK-34633. --- Resolution: Fixed > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.1 > > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan closed FLINK-34633. - > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.1 > > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838429#comment-17838429 ] Jane Chan commented on FLINK-34633: --- Fixed in master 43a3d50ce3982b9abf04b81407fed46c5c25f819 > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-34633: -- Fix Version/s: 1.19.1 > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.1 > > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35143) Expose newly added tables capture in mysql pipeline connector
[ https://issues.apache.org/jira/browse/FLINK-35143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838428#comment-17838428 ] Hongshun Wang commented on FLINK-35143: --- I'd like to do it > Expose newly added tables capture in mysql pipeline connector > - > > Key: FLINK-35143 > URL: https://issues.apache.org/jira/browse/FLINK-35143 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > > Currently, mysql pipeline connector still don't allowed to capture newly > added tables. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35129) Postgres source commits the offset after every multiple checkpoint cycles.
[ https://issues.apache.org/jira/browse/FLINK-35129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838427#comment-17838427 ] Muhammet Orazov commented on FLINK-35129: - Hey [~loserwang1024] , I would be happy to work on it! Yes, could you please assign it to me? > Postgres source commits the offset after every multiple checkpoint cycles. > -- > > Key: FLINK-35129 > URL: https://issues.apache.org/jira/browse/FLINK-35129 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Hongshun Wang >Priority: Major > > After entering the Stream phase, the offset consumed by the global slot is > committed upon the completion of each checkpoint, preventing log files from > being unable to be recycled continuously, which could lead to insufficient > disk space. > However, the job can only restart from the latest checkpoint or savepoint. if > restored from an earlier state, WAL may already have been recycled. > > The way to solve it is to commit the offset after every multiple checkpoint > cycles. The number of checkpoint cycles is determine by connector option, and > the default value is 3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34633][table] Support unnesting array constants [flink]
LadyForest merged PR #24510: URL: https://github.com/apache/flink/pull/24510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
Hongshun Wang created FLINK-35149: - Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink Key: FLINK-35149 URL: https://issues.apache.org/jira/browse/FLINK-35149 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Hongshun Wang Fix For: 3.1.0 Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored. {code:java} private void sinkTo( DataStream input, Sink sink, String sinkName, OperatorID schemaOperatorID) { DataStream stream = input; // Pre write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology) sink).addPreWriteTopology(stream); } if (sink instanceof TwoPhaseCommittingSink) { addCommittingTopology(sink, stream, sinkName, schemaOperatorID); } else { input.transform( SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]
GOODBOY008 commented on PR #3229: URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2062859948 @ZmmBigdata @Jiabao-Sun Issue was fixed in pr https://github.com/apache/flink-cdc/pull/3217 and merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28693][table] Fix janino compile failed because the code generated refers the class in table-planner [flink]
xuyangzhong commented on PR #24280: URL: https://github.com/apache/flink/pull/24280#issuecomment-2062858561 Thanks very much for the bp @snuyanzin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35081) CompileException when watermark definition contains coalesce and to_timestamp built-in functions
[ https://issues.apache.org/jira/browse/FLINK-35081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838424#comment-17838424 ] xuyang commented on FLINK-35081: I think this bug is same with FLINK-28693 > CompileException when watermark definition contains coalesce and to_timestamp > built-in functions > > > Key: FLINK-35081 > URL: https://issues.apache.org/jira/browse/FLINK-35081 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.1 >Reporter: Grzegorz Kołakowski >Priority: Major > > I have a data stream in which event-time column can have two data formats. To > be able to define watermark on the table, I used coalesce and to_timestamp > built-in functions as shown below: > {code:sql} > create table test ( > `@timestamp` VARCHAR, > __rowtime AS coalesce( > to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss'), > to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss.SSS') > ), > watermark for __rowtime as __rowtime - INTERVAL '30' SECOND, > ... > ) with ( ... ) > {code} > The job failed with the following stacktrace: > {noformat} > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) > at akka.actor.ActorCell.invoke(ActorCell.scala:547) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) > Caused
[jira] [Commented] (FLINK-35144) Support various sources sync for FlinkCDC in one pipeline
[ https://issues.apache.org/jira/browse/FLINK-35144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838422#comment-17838422 ] Hongshun Wang commented on FLINK-35144: --- You mean that sync multiple sources to same sink? > Support various sources sync for FlinkCDC in one pipeline > - > > Key: FLINK-35144 > URL: https://issues.apache.org/jira/browse/FLINK-35144 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Congxian Qiu >Priority: Major > > Currently, the FlinkCDC pipeline can only support a single source in one > pipeline, we need to start multiple pipelines when there are various sources. > For upstream which uses sharding, we need to sync multiple sources in one > pipeline, the current pipeline can't do this because it can only support a > single source. > This issue wants to support the sync of multiple sources in one pipeline. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]
loserwang1024 commented on code in PR #3230: URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569794515 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java: ## @@ -159,7 +159,15 @@ public String toString() { // --- public static StreamSplit appendFinishedSplitInfos( StreamSplit streamSplit, List splitInfos) { +// re-calculate the starting changelog offset after the new table added +Offset startingOffset = streamSplit.getStartingOffset(); +for (FinishedSnapshotSplitInfo splitInfo : splitInfos) { +if (splitInfo.getHighWatermark().isBefore(startingOffset)) { +startingOffset = splitInfo.getHighWatermark(); +} +} splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos()); + return new StreamSplit( streamSplit.splitId, streamSplit.getStartingOffset(), Review Comment: done it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]
Jiabao-Sun commented on PR #3229: URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2062840911 Thanks @ZmmBigdata for this contribution. Could you rebase master and resolve conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,19 +103,26 @@ public class AsyncExecutionController { final AtomicInteger inFlightRecordNum; public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); Review Comment: Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,19 +103,26 @@ public class AsyncExecutionController { final AtomicInteger inFlightRecordNum; public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); Review Comment: Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now. ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," +
Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]
reswqa commented on PR #24541: URL: https://github.com/apache/flink/pull/24541#issuecomment-2062837816 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34738) "Deployment - YARN" Page for Flink CDC Chinese Documentation
[ https://issues.apache.org/jira/browse/FLINK-34738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun reassigned FLINK-34738: -- Assignee: Vincent Woo > "Deployment - YARN" Page for Flink CDC Chinese Documentation > > > Key: FLINK-34738 > URL: https://issues.apache.org/jira/browse/FLINK-34738 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation, Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: LvYanquan >Assignee: Vincent Woo >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.0 > > > Translate > [https://github.com/apache/flink-cdc/blob/master/docs/content/docs/deployment/yarn.md] > into Chinese. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-3.0][minor][cdc][docs] Optimize markdown styles in quickstart doc [flink-cdc]
Jiabao-Sun merged PR #3223: URL: https://github.com/apache/flink-cdc/pull/3223 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]
Jiabao-Sun merged PR #3217: URL: https://github.com/apache/flink-cdc/pull/3217 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]
flinkbot commented on PR #24675: URL: https://github.com/apache/flink/pull/24675#issuecomment-2062593439 ## CI report: * fc79b8051c60e2628b2098e82bc945f79b08a3d3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]
liuml07 commented on PR #24675: URL: https://github.com/apache/flink/pull/24675#issuecomment-2062592367 @snuyanzin could you take a look? Thanks, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]
liuml07 opened a new pull request, #24675: URL: https://github.com/apache/flink/pull/24675 …blic constructor ## What is the purpose of the change https://issues.apache.org/jira/browse/FLINK-35148 InstantiationUtil#hasPublicNullaryConstructor checks whether the given class has a public nullary constructor. The implementation can be improved a bit: the `Modifier#isPublic` check within the for-loop can be skipped as the Class#getConstructors() only returns public constructors. We can also add a negative unit test for this. ## Brief change log - Skip the `Modifier#isPublic` check - Use Java stream - Add negative unit test ## Verifying this change This change is already covered by existing tests, such as `InstantiationUtilTest`. This change added a new negative test in that class. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35148) Improve InstantiationUtil for checking nullary public constructor
[ https://issues.apache.org/jira/browse/FLINK-35148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35148: --- Labels: pull-request-available (was: ) > Improve InstantiationUtil for checking nullary public constructor > - > > Key: FLINK-35148 > URL: https://issues.apache.org/jira/browse/FLINK-35148 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.19.0, 1.18.1 >Reporter: Mingliang Liu >Priority: Minor > Labels: pull-request-available > > {{InstantiationUtil#hasPublicNullaryConstructor}} checks whether the given > class has a public nullary constructor. The implementation can be improved a > bit: the `Modifier#isPublic` check within the for-loop can be skipped as the > {{Class#getConstructors()}} only returns public constructors. > We can also add a negative unit test for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35148) Improve InstantiationUtil for checking nullary public constructor
Mingliang Liu created FLINK-35148: - Summary: Improve InstantiationUtil for checking nullary public constructor Key: FLINK-35148 URL: https://issues.apache.org/jira/browse/FLINK-35148 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.18.1, 1.19.0 Reporter: Mingliang Liu {{InstantiationUtil#hasPublicNullaryConstructor}} checks whether the given class has a public nullary constructor. The implementation can be improved a bit: the `Modifier#isPublic` check within the for-loop can be skipped as the {{Class#getConstructors()}} only returns public constructors. We can also add a negative unit test for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table
[ https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sharon Xie updated FLINK-35147: --- Description: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce 1. run the below SQL {code:sql} CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id {code} 2. Stop with a savepoint, then update output table with {code:sql} CREATE TABLE output ( `id` integer primary key, – change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); {code} 3. Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png|width=911,height=352! !image-2024-04-17-14-15-21-647.png|width=1172,height=458! was: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce 1. run the below SQL {code:sql} CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id {code} 2. Stop with a savepoint, then update output table with {code:sql} CREATE TABLE output ( `id` integer primary key, – change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); {code} 3. Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! > SinkMaterializer throws StateMigrationException when widening the field type > in the output table > > > Key: FLINK-35147 > URL: https://issues.apache.org/jira/browse/FLINK-35147 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Sharon Xie >Priority: Major > Attachments: image-2024-04-17-14-15-21-647.png, > image-2024-04-17-14-15-35-297.png > > > When a field type in the output table is changed from int -> bigint or > timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. > This is unexpected as the change is backward compatible. The new type should > be able to "accept" all the old values that had narrower type. > Note that the planner works fine and would accept such change. > To reproduce > 1. run the below SQL > {code:sql} > CREATE TABLE ltable ( > `id` integer
[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table
[ https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sharon Xie updated FLINK-35147: --- Description: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce 1. run the below SQL {code:sql} CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id {code} 2. Stop with a savepoint, then update output table with {code:sql} CREATE TABLE output ( `id` integer primary key, – change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); {code} 3. Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! was: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce {code:sql} CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id {code} Run it, stop with a savepoint, then update output table with {code:sql} CREATE TABLE output ( `id` integer primary key, – change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); {code} Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! > SinkMaterializer throws StateMigrationException when widening the field type > in the output table > > > Key: FLINK-35147 > URL: https://issues.apache.org/jira/browse/FLINK-35147 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Sharon Xie >Priority: Major > Attachments: image-2024-04-17-14-15-21-647.png, > image-2024-04-17-14-15-35-297.png > > > When a field type in the output table is changed from int -> bigint or > timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. > This is unexpected as the change is backward compatible. The new type should > be able to "accept" all the old values that had narrower type. > Note that the planner works fine and would accept such change. > To reproduce > 1. run the below SQL > {code:sql} > CREATE TABLE ltable ( > `id` integer primary key, > `num` int > ) WITH ( >
[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table
[ https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sharon Xie updated FLINK-35147: --- Description: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce {code:sql} CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id {code} Run it, stop with a savepoint, then update output table with {code:sql} CREATE TABLE output ( `id` integer primary key, – change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); {code} Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! was: When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce ``` CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id ``` Run it, stop with a savepoint, then update output table with ``` CREATE TABLE output ( `id` integer primary key, -- change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); ``` Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! > SinkMaterializer throws StateMigrationException when widening the field type > in the output table > > > Key: FLINK-35147 > URL: https://issues.apache.org/jira/browse/FLINK-35147 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: Sharon Xie >Priority: Major > Attachments: image-2024-04-17-14-15-21-647.png, > image-2024-04-17-14-15-35-297.png > > > When a field type in the output table is changed from int -> bigint or > timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. > This is unexpected as the change is backward compatible. The new type should > be able to "accept" all the old values that had narrower type. > Note that the planner works fine and would accept such change. > To reproduce > > {code:sql} > CREATE TABLE ltable ( > `id` integer primary key, > `num` int > ) WITH ( > 'connector' = 'upsert-kafka', > 'properties.bootstrap.servers' =
[jira] [Created] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table
Sharon Xie created FLINK-35147: -- Summary: SinkMaterializer throws StateMigrationException when widening the field type in the output table Key: FLINK-35147 URL: https://issues.apache.org/jira/browse/FLINK-35147 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Sharon Xie Attachments: image-2024-04-17-14-15-21-647.png, image-2024-04-17-14-15-35-297.png When a field type in the output table is changed from int -> bigint or timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. This is unexpected as the change is backward compatible. The new type should be able to "accept" all the old values that had narrower type. Note that the planner works fine and would accept such change. To reproduce ``` CREATE TABLE ltable ( `id` integer primary key, `num` int ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test1' ); CREATE TABLE rtable ( `id` integer primary key, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test2' ); CREATE TABLE output ( `id` integer primary key, `num` int, `ts` timestamp(3) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); insert into `output` select ltable.id, num, ts from ltable join rtable on ltable.id = rtable.id ``` Run it, stop with a savepoint, then update output table with ``` CREATE TABLE output ( `id` integer primary key, -- change one of the type below would cause the issue `num` bigint, `ts` timestamp(6) ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'kafka.test:9092', 'key.format' = 'json', 'value.format' = 'json', 'topic' = 'test3' ); ``` Restart the job with the savepoint created Sample screenshots !image-2024-04-17-14-15-35-297.png! !image-2024-04-17-14-15-21-647.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31860] FlinkDeployments never finalize when namespace is deleted [flink-kubernetes-operator]
jiangzho commented on PR #817: URL: https://github.com/apache/flink-kubernetes-operator/pull/817#issuecomment-2062344434 Thanks @gyfora for quick turnaround ! The commit has been updated with style fix. This patch is validated via minikube setup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs
[ https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838379#comment-17838379 ] Xingcan Cui commented on FLINK-34583: - Hi [~xuyangzhong], thanks for looking into this. I hit the issue when using the Paimon table source. The execution plan looks good. However, the options don't work. It could be a runtime issue or Paimon source implementation bug. I can't remember clearly if Flink generates multiple table sources and then merges them at runtime. If it does, the options may not be merged properly. !image-2024-04-17-16-48-49-073.png! > Bug for dynamic table option hints with multiple CTEs > - > > Key: FLINK-34583 > URL: https://issues.apache.org/jira/browse/FLINK-34583 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > Attachments: image-2024-04-17-16-35-06-153.png, > image-2024-04-17-16-48-49-073.png > > > The table options hints don't work well with multiple WITH clauses referring > to the same table. Please see the following example. > > The following query with hints works well. > {code:java} > SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code} > The following query with multiple WITH clauses also works well. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...) > SELECT * FROM T3;{code} > The following query with multiple WITH clauses referring to the same original > table failed to recognize the hints. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...), > T4 AS (SELECT ... FROM T2 WHERE...), > T5 AS (SELECT ... FROM T3 JOIN T4 ON...) > SELECT * FROM T5;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs
[ https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34583: Attachment: image-2024-04-17-16-48-49-073.png > Bug for dynamic table option hints with multiple CTEs > - > > Key: FLINK-34583 > URL: https://issues.apache.org/jira/browse/FLINK-34583 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > Attachments: image-2024-04-17-16-35-06-153.png, > image-2024-04-17-16-48-49-073.png > > > The table options hints don't work well with multiple WITH clauses referring > to the same table. Please see the following example. > > The following query with hints works well. > {code:java} > SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code} > The following query with multiple WITH clauses also works well. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...) > SELECT * FROM T3;{code} > The following query with multiple WITH clauses referring to the same original > table failed to recognize the hints. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...), > T4 AS (SELECT ... FROM T2 WHERE...), > T5 AS (SELECT ... FROM T3 JOIN T4 ON...) > SELECT * FROM T5;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs
[ https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34583: Attachment: image-2024-04-17-16-35-06-153.png > Bug for dynamic table option hints with multiple CTEs > - > > Key: FLINK-34583 > URL: https://issues.apache.org/jira/browse/FLINK-34583 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > Attachments: image-2024-04-17-16-35-06-153.png > > > The table options hints don't work well with multiple WITH clauses referring > to the same table. Please see the following example. > > The following query with hints works well. > {code:java} > SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code} > The following query with multiple WITH clauses also works well. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...) > SELECT * FROM T3;{code} > The following query with multiple WITH clauses referring to the same original > table failed to recognize the hints. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...), > T4 AS (SELECT ... FROM T2 WHERE...), > T5 AS (SELECT ... FROM T3 JOIN T4 ON...) > SELECT * FROM T5;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358 ] Aleksandr Pilipenko commented on FLINK-35115: - snapshotState method in FlinkKinesisConsumer skip saving state if operator had been cancelled: 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. This leads to state not being updated in state backend during stop-with-savepoint workflow. Created a PR to resolve this. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358 ] Aleksandr Pilipenko edited comment on FLINK-35115 at 4/17/24 7:01 PM: -- snapshotState method in FlinkKinesisConsumer skip saving state if operator had been cancelled: {noformat} 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null.{noformat} This leads to state not being updated in state backend during stop-with-savepoint workflow. Created a PR to resolve this. was (Author: a.pilipenko): snapshotState method in FlinkKinesisConsumer skip saving state if operator had been cancelled: 2024-04-17 14:05:52,645 DEBUG org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - snapshotState() called on closed source; returning null. This leads to state not being updated in state backend during stop-with-savepoint workflow. Created a PR to resolve this. > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-31860] FlinkDeployments never finalize when namespace is deleted [flink-kubernetes-operator]
jiangzho opened a new pull request, #817: URL: https://github.com/apache/flink-kubernetes-operator/pull/817 ## What is the purpose of the change This patch is to tackle the corner case where `cleanup` of CustomResource is constantly failing due to event publish failure, and therefore cause the `cleanup` hanging forever. Operator attempts to publish events when reconcile changes or cleaning up for a CustomResource. This patch allows the reconcile & clean up logic to proceed, if and only if the event publishing fails as a result of 403 forbidden, which happens when the namespace is being deleted (in terminating state). k8s rejects events / resource creation in a terminating namespace. In this way, at-least-once events delivery is still guaranteed in other cases. Event consumers need to account for the "namespace deleted" scenario, which all resources within are implicitly deleted. ## Brief change log - Fix event publish blocking reconcile & cleanup upon 403 ## Verifying this change This change added tests and can be verified as follows: - Added additional scenario in EventUtilsTest and expect it to succeed ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on PR #23470: URL: https://github.com/apache/flink/pull/23470#issuecomment-2061766848 Thanks a lot @xuyangzhong for your review. I addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35115] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]
z3d1k commented on PR #138: URL: https://github.com/apache/flink-connector-aws/pull/138#issuecomment-2061752988 @dannycranmer, @hlteoh37 please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35115] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]
z3d1k opened a new pull request, #138: URL: https://github.com/apache/flink-connector-aws/pull/138 ## Purpose of the change Perform snapshot state even after source operator had been cancelled. This solves the issue when operator state is not being saved during stor-with-savepoint workflow. ## Verifying this change - *Added unit tests* - *Manually verified by running the Kinesis connector on a local Flink cluster.* ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35115: --- Labels: kinesis pull-request-available (was: kinesis) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35116] Bump operator sdk version to 4.8.3 [flink-kubernetes-operator]
mbalassi merged PR #816: URL: https://github.com/apache/flink-kubernetes-operator/pull/816 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35124) Connector Release Fails to run Checkstyle
[ https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838277#comment-17838277 ] Etienne Chauchot commented on FLINK-35124: -- It was failing on cassandra before the _utils.sh change, at the time I did a quick workaround by copying the suppression.xml file to /tools > Connector Release Fails to run Checkstyle > - > > Key: FLINK-35124 > URL: https://issues.apache.org/jira/browse/FLINK-35124 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Danny Cranmer >Priority: Major > > During a release of the AWS connectors the build was failing at the > \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error. > > {code:java} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on > project flink-connector-aws: Failed during checkstyle execution: Unable to > find suppressions file at location: /tools/maven/suppressions.xml: Could not > find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code} > > Looks like it is caused by this > [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35124) Connector Release Fails to run Checkstyle
[ https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838277#comment-17838277 ] Etienne Chauchot edited comment on FLINK-35124 at 4/17/24 4:21 PM: --- It was failing on cassandra before the _utils.sh change, at the time I did a quick workaround by copying the suppression.xml file to /tools. I think it was failing also for the other connectors. was (Author: echauchot): It was failing on cassandra before the _utils.sh change, at the time I did a quick workaround by copying the suppression.xml file to /tools > Connector Release Fails to run Checkstyle > - > > Key: FLINK-35124 > URL: https://issues.apache.org/jira/browse/FLINK-35124 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Danny Cranmer >Priority: Major > > During a release of the AWS connectors the build was failing at the > \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error. > > {code:java} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on > project flink-connector-aws: Failed during checkstyle execution: Unable to > find suppressions file at location: /tools/maven/suppressions.xml: Could not > find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code} > > Looks like it is caused by this > [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838238#comment-17838238 ] Ryan Skraba commented on FLINK-35041: - 1.20 test_ci_core https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58969=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8933 1.20 test_ci_core https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58971=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8897 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Blocker > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35146) CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan
[ https://issues.apache.org/jira/browse/FLINK-35146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Skraba updated FLINK-35146: Labels: test-stability (was: ) > CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan > --- > > Key: FLINK-35146 > URL: https://issues.apache.org/jira/browse/FLINK-35146 > Project: Flink > Issue Type: Bug >Affects Versions: 1.19.1 >Reporter: Ryan Skraba >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58960=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=16690 > {code} > Apr 17 06:27:47 06:27:47.363 [ERROR] Tests run: 2, Failures: 1, Errors: 0, > Skipped: 1, Time elapsed: 64.51 s <<< FAILURE! -- in > org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase > Apr 17 06:27:47 06:27:47.364 [ERROR] > org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan[executionMode] > -- Time elapsed: 56.55 s <<< FAILURE! > Apr 17 06:27:47 org.opentest4j.AssertionFailedError: Did not get expected > results before timeout, actual result: null. ==> expected: but was: > > Apr 17 06:27:47 at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > Apr 17 06:27:47 at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > Apr 17 06:27:47 at > org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > Apr 17 06:27:47 at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > Apr 17 06:27:47 at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > Apr 17 06:27:47 at > org.apache.flink.table.sql.SqlITCaseBase.checkResultFile(SqlITCaseBase.java:216) > Apr 17 06:27:47 at > org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:149) > Apr 17 06:27:47 at > org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:133) > Apr 17 06:27:47 at > org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan(CompileAndExecuteRemotePlanITCase.java:70) > Apr 17 06:27:47 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 17 06:27:47 at > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > Apr 17 06:27:47 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Apr 17 06:27:47 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35146) CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan
Ryan Skraba created FLINK-35146: --- Summary: CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan Key: FLINK-35146 URL: https://issues.apache.org/jira/browse/FLINK-35146 Project: Flink Issue Type: Bug Affects Versions: 1.19.1 Reporter: Ryan Skraba https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58960=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=16690 {code} Apr 17 06:27:47 06:27:47.363 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 64.51 s <<< FAILURE! -- in org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase Apr 17 06:27:47 06:27:47.364 [ERROR] org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan[executionMode] -- Time elapsed: 56.55 s <<< FAILURE! Apr 17 06:27:47 org.opentest4j.AssertionFailedError: Did not get expected results before timeout, actual result: null. ==> expected: but was: Apr 17 06:27:47 at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) Apr 17 06:27:47 at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) Apr 17 06:27:47 at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) Apr 17 06:27:47 at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) Apr 17 06:27:47 at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) Apr 17 06:27:47 at org.apache.flink.table.sql.SqlITCaseBase.checkResultFile(SqlITCaseBase.java:216) Apr 17 06:27:47 at org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:149) Apr 17 06:27:47 at org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:133) Apr 17 06:27:47 at org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan(CompileAndExecuteRemotePlanITCase.java:70) Apr 17 06:27:47 at java.lang.reflect.Method.invoke(Method.java:498) Apr 17 06:27:47 at org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) Apr 17 06:27:47 at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) Apr 17 06:27:47 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33186) CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-33186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838235#comment-17838235 ] Ryan Skraba commented on FLINK-33186: - 1.20 test_cron_hadoop313 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58958=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8314 1.20 Java 8: Test (module: tests) https://github.com/apache/flink/actions/runs/8719280474/job/23918749100#step:10:8028 > CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished > fails on AZP > - > > Key: FLINK-33186 > URL: https://issues.apache.org/jira/browse/FLINK-33186 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.19.0, 1.18.1 >Reporter: Sergey Nuyanzin >Assignee: Jiang Xin >Priority: Critical > Labels: test-stability > > This build > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53509=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8762 > fails as > {noformat} > Sep 28 01:23:43 Caused by: > org.apache.flink.runtime.checkpoint.CheckpointException: Task local > checkpoint failure. > Sep 28 01:23:43 at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550) > Sep 28 01:23:43 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2248) > Sep 28 01:23:43 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2235) > Sep 28 01:23:43 at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:817) > Sep 28 01:23:43 at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > Sep 28 01:23:43 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Sep 28 01:23:43 at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > Sep 28 01:23:43 at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > Sep 28 01:23:43 at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > Sep 28 01:23:43 at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > Sep 28 01:23:43 at java.lang.Thread.run(Thread.java:748) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31421) DataGeneratorSourceITCase.testGatedRateLimiter failed on CI
[ https://issues.apache.org/jira/browse/FLINK-31421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838234#comment-17838234 ] Ryan Skraba commented on FLINK-31421: - 1.19 Hadoop 3.1.3: Test (module: misc) https://github.com/apache/flink/actions/runs/8715237951/job/23907182643#step:10:23996 > DataGeneratorSourceITCase.testGatedRateLimiter failed on CI > --- > > Key: FLINK-31421 > URL: https://issues.apache.org/jira/browse/FLINK-31421 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0 >Reporter: Qingsheng Ren >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47044=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=23109] > {code:java} > [ERROR] > org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter > Time elapsed: 1.557 s <<< FAILURE! > Mar 11 04:43:12 java.lang.AssertionError: > Mar 11 04:43:12 > Mar 11 04:43:12 Expected size: 8 but was: 7 in: > Mar 11 04:43:12 [1L, 1L, 1L, 1L, 1L, 1L, 1L] > Mar 11 04:43:12 at > org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter(DataGeneratorSourceITCase.java:200) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34644) RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing failed with ConnectionClosedException
[ https://issues.apache.org/jira/browse/FLINK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838232#comment-17838232 ] Ryan Skraba commented on FLINK-34644: - 1.20 Java 21: Test (module: core) https://github.com/apache/flink/actions/runs/8715237422/job/23907067308#step:10:9495 > RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing failed with > ConnectionClosedException > --- > > Key: FLINK-34644 > URL: https://issues.apache.org/jira/browse/FLINK-34644 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > https://github.com/apache/flink/actions/runs/8189958608/job/22396362238#step:10:9215 > {code} > Error: 15:13:33 15:13:33.779 [ERROR] Tests run: 68, Failures: 0, Errors: 1, > Skipped: 4, Time elapsed: 17.81 s <<< FAILURE! -- in > org.apache.flink.runtime.rest.RestServerEndpointITCase > Error: 15:13:33 15:13:33.779 [ERROR] > org.apache.flink.runtime.rest.RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing > -- Time elapsed: 0.329 s <<< ERROR! > Mar 07 15:13:33 java.util.concurrent.ExecutionException: > org.apache.flink.runtime.rest.ConnectionClosedException: Channel became > inactive. > Mar 07 15:13:33 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Mar 07 15:13:33 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > Mar 07 15:13:33 at > org.apache.flink.runtime.rest.RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing(RestServerEndpointITCase.java:592) > Mar 07 15:13:33 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 07 15:13:33 at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > Mar 07 15:13:33 at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > Mar 07 15:13:33 at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > Mar 07 15:13:33 at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > Mar 07 15:13:33 at > java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > Mar 07 15:13:33 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Mar 07 15:13:33 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Mar 07 15:13:33 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Mar 07 15:13:33 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Mar 07 15:13:33 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Mar 07 15:13:33 Caused by: > org.apache.flink.runtime.rest.ConnectionClosedException: Channel became > inactive. > Mar 07 15:13:33 at >
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838233#comment-17838233 ] Ryan Skraba commented on FLINK-35041: - 1.20 Java 8: Test (module: core) https://github.com/apache/flink/actions/runs/8715237422/job/23907053858#step:10:9074 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Blocker > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34646) AggregateITCase.testDistinctWithRetract timed out
[ https://issues.apache.org/jira/browse/FLINK-34646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838231#comment-17838231 ] Ryan Skraba commented on FLINK-34646: - 1.18 AdaptiveScheduler: Test (module: table) https://github.com/apache/flink/actions/runs/8715237382/job/23907050959#step:10:12476 This is the exact same timeout but it's happening on {{AggregateITCase.testMinMaxWithBinaryString}} > AggregateITCase.testDistinctWithRetract timed out > - > > Key: FLINK-34646 > URL: https://issues.apache.org/jira/browse/FLINK-34646 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > https://github.com/apache/flink/actions/runs/8211401561/job/22460442229#step:10:17161 > {code} > "main" #1 prio=5 os_prio=0 tid=0x7f70abeb7000 nid=0x4cff3 waiting on > condition [0x7f70ac3f6000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xcd24c690> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876) > at > org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase.testDistinctWithRetract(AggregateITCase.scala:345) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-35115: -- Affects Version/s: aws-connector-4.2.0 aws-connector-4.1.0 aws-connector-4.0.0 > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-35115: -- Fix Version/s: aws-connector-4.3.0 (was: aws-connector-4.2.0) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-35115: -- Fix Version/s: aws-connector-4.2.0 > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis > Fix For: aws-connector-4.2.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-35115: -- Affects Version/s: 1.17.2 1.15.4 > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-35115: -- Priority: Blocker (was: Major) > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.16.3, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35124) Connector Release Fails to run Checkstyle
[ https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838224#comment-17838224 ] Danny Cranmer commented on FLINK-35124: --- Actually I still think this is the issue, since it runs the maven commands on the "pristine source", which is missing the maven config directory: https://github.com/apache/flink-connector-shared-utils/blob/release_utils/stage_jars.sh#L54. If it were an invalid path, it would fail on all builds, not just during the release. > Connector Release Fails to run Checkstyle > - > > Key: FLINK-35124 > URL: https://issues.apache.org/jira/browse/FLINK-35124 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Danny Cranmer >Priority: Major > > During a release of the AWS connectors the build was failing at the > \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error. > > {code:java} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on > project flink-connector-aws: Failed during checkstyle execution: Unable to > find suppressions file at location: /tools/maven/suppressions.xml: Could not > find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code} > > Looks like it is caused by this > [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35125) Implement ValueState for ForStStateBackend
[ https://issues.apache.org/jira/browse/FLINK-35125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-35125: Assignee: Jinzhong Li > Implement ValueState for ForStStateBackend > -- > > Key: FLINK-35125 > URL: https://issues.apache.org/jira/browse/FLINK-35125 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34987) Introduce Internal State Interface for Async State API
[ https://issues.apache.org/jira/browse/FLINK-34987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-34987. -- Fix Version/s: 1.20.0 Resolution: Fixed merged 7699a0d2...2c5078bc into master > Introduce Internal State Interface for Async State API > -- > > Key: FLINK-34987 > URL: https://issues.apache.org/jira/browse/FLINK-34987 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33394) DataGeneratorSourceITCase.testGatedRateLimiter fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-33394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838223#comment-17838223 ] Ryan Skraba commented on FLINK-33394: - This should be closed as a duplicate of FLINK-31421 (Weirdly enough, I can't link two Jira ... unless another link already exists!) > DataGeneratorSourceITCase.testGatedRateLimiter fails on AZP > --- > > Key: FLINK-33394 > URL: https://issues.apache.org/jira/browse/FLINK-33394 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.2 >Reporter: Sergey Nuyanzin >Priority: Major > Labels: test-stability > > This build > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54054=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22927 > fails on AZP as > {noformat} > Oct 26 07:37:41 [1L, 1L, 1L, 1L, 1L, 1L] > Oct 26 07:37:41 at > org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter(DataGeneratorSourceITCase.java:200) > Oct 26 07:37:41 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Oct 26 07:37:41 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Oct 26 07:37:41 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Oct 26 07:37:41 at java.lang.reflect.Method.invoke(Method.java:498) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]
masteryhx closed pull request #24651: [FLINK-34987][state] Introduce Internal State for Async State API URL: https://github.com/apache/flink/pull/24651 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org