[jira] [Updated] (FLINK-35302) Flink REST server throws exception on unknown fields in RequestBody

2024-05-07 Thread Juntao Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juntao Hu updated FLINK-35302:
--
Description: 
As 
[FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
 and FLINK-33268 mentioned, when an old version REST client receives response 
from a new version REST server, with strict JSON mapper, the client will throw 
exceptions on newly added fields, which is not convenient for situations where 
a centralized client deals with REST servers of different versions (e.g. k8s 
operator).

But this incompatibility can also happens at server side, when a new version 
REST client sends requests to an old version REST server with additional 
fields. Making server flexible with unknown fields can save clients from 
backward compatibility code.

  was:
As 
[FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
 and FLINK-33258 mentioned, when an old version REST client receives response 
from a new version REST server, with strict JSON mapper, the client will throw 
exceptions on newly added fields, which is not convenient for situations where 
a centralized client deals with REST servers of different versions (e.g. k8s 
operator).

But this incompatibility can also happens at server side, when a new version 
REST client sends requests to an old version REST server with additional 
fields. Making server flexible with unknown fields can save clients from 
backward compatibility code.


> Flink REST server throws exception on unknown fields in RequestBody
> ---
>
> Key: FLINK-35302
> URL: https://issues.apache.org/jira/browse/FLINK-35302
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Juntao Hu
>Priority: Major
> Fix For: 1.19.1
>
>
> As 
> [FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
>  and FLINK-33268 mentioned, when an old version REST client receives response 
> from a new version REST server, with strict JSON mapper, the client will 
> throw exceptions on newly added fields, which is not convenient for 
> situations where a centralized client deals with REST servers of different 
> versions (e.g. k8s operator).
> But this incompatibility can also happens at server side, when a new version 
> REST client sends requests to an old version REST server with additional 
> fields. Making server flexible with unknown fields can save clients from 
> backward compatibility code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35302][rest] Ignore unknown fields in REST request deserialization [flink]

2024-05-07 Thread via GitHub


vancior98 opened a new pull request, #24759:
URL: https://github.com/apache/flink/pull/24759

   
   ## What is the purpose of the change
   
   This PR makes REST server ignoring unknown fields when deserializing request 
body, which is helpful in some senarios where a centralized REST client of 
newer version (contains additional fields in request) communicates with REST 
servers of various versions, potentially an old version that doesn't recognize 
additional fields.
   
   Related to #23930 , in which compatibility between old version client and 
new version server was solved.
   
   ## Brief change log
   
   - change ObjectMapper for AbstractHandler to flexible ObjectMapper ignoring 
unknown fields
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   - unit test AbstractHandlerTest#testIgnoringUnknownFields
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35302) Flink REST server throws exception on unknown fields in RequestBody

2024-05-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35302:
---
Labels: pull-request-available  (was: )

> Flink REST server throws exception on unknown fields in RequestBody
> ---
>
> Key: FLINK-35302
> URL: https://issues.apache.org/jira/browse/FLINK-35302
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Juntao Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> As 
> [FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
>  and FLINK-33268 mentioned, when an old version REST client receives response 
> from a new version REST server, with strict JSON mapper, the client will 
> throw exceptions on newly added fields, which is not convenient for 
> situations where a centralized client deals with REST servers of different 
> versions (e.g. k8s operator).
> But this incompatibility can also happens at server side, when a new version 
> REST client sends requests to an old version REST server with additional 
> fields. Making server flexible with unknown fields can save clients from 
> backward compatibility code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35302][rest] Ignore unknown fields in REST request deserialization [flink]

2024-05-07 Thread via GitHub


flinkbot commented on PR #24759:
URL: https://github.com/apache/flink/pull/24759#issuecomment-2097600239

   
   ## CI report:
   
   * 41683b45b58a6848c9190909a5f48b035e5a9b41 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35302) Flink REST server throws exception on unknown fields in RequestBody

2024-05-07 Thread Juntao Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844165#comment-17844165
 ] 

Juntao Hu commented on FLINK-35302:
---

Hi [~gaborgsomogyi], do you have time to look at this PR?

> Flink REST server throws exception on unknown fields in RequestBody
> ---
>
> Key: FLINK-35302
> URL: https://issues.apache.org/jira/browse/FLINK-35302
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Juntao Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> As 
> [FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
>  and FLINK-33268 mentioned, when an old version REST client receives response 
> from a new version REST server, with strict JSON mapper, the client will 
> throw exceptions on newly added fields, which is not convenient for 
> situations where a centralized client deals with REST servers of different 
> versions (e.g. k8s operator).
> But this incompatibility can also happens at server side, when a new version 
> REST client sends requests to an old version REST server with additional 
> fields. Making server flexible with unknown fields can save clients from 
> backward compatibility code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]

2024-05-07 Thread via GitHub


Zakelly commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1591946404


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java:
##
@@ -114,6 +116,10 @@ public void setKeyedStateStore(@Nullable KeyedStateStore 
keyedStateStore) {
 this.keyedStateStore = keyedStateStore;
 }
 
+public void setAsyncKeyedStateStore(@Nullable AsyncKeyedStateStore 
asyncKeyedStateStore) {

Review Comment:
   How about renaming `AsyncKeyedStateStore` to `KeyedStateStoreV2` or 
something? This is for new state v2 APIs instead of only for async states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33268) Flink REST API response parsing throws exception on new fields

2024-05-07 Thread Juntao Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844179#comment-17844179
 ] 

Juntao Hu commented on FLINK-33268:
---

[~gaborgsomogyi]  thanks for replying! I've created an issue FLINK-35302 and 
[PR|https://github.com/apache/flink/pull/24759] , it would be great if you can 
review it.

> Flink REST API response parsing throws exception on new fields
> --
>
> Key: FLINK-33268
> URL: https://issues.apache.org/jira/browse/FLINK-33268
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> At the moment Flink is not ignoring unknown fields when parsing REST 
> responses. An example for such a class is JobDetailsInfo but this applies to 
> all others. It would be good to add this support to increase compatibility.
> The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
> with 2 different Flink versions where the newer version has added a new field 
> to any REST response. Such case the operator gets an exception when for 
> example it tries to poll the job details with the additional field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]

2024-05-07 Thread via GitHub


hackergin commented on code in PR #24750:
URL: https://github.com/apache/flink/pull/24750#discussion_r1591948579


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
+import org.apache.flink.table.refresh.ContinuousRefreshHandler;
+import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.DeploymentOptions.TARGET;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static 
org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
+
+/** Manager is responsible for execute the {@link MaterializedTableOperation}. 
*/
+@Internal
+public class MaterializedTableManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MaterializedTableManager.class);
+
+public static ResultFetcher callMaterializedTableOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+MaterializedTableOperation op,
+String statement) {
+if (op instanceof CreateMaterializedTableOperation) {
+return callCreateMaterializedTableOperation(
+operationExecutor, handle, 
(CreateMaterializedTableOperation) op);
+}
+throw new SqlExecutionException(
+String.format(
+"Unsupported Operation %s for materialized table.", 
op.asSummaryString()));
+}
+
+private static ResultFetcher callCreateMaterializedTableOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+CreateMaterializedTableOperation createMaterializedTableOperation) 
{
+CatalogMaterializedTable materializedTable =
+createMaterializedTableOperation.getCatalogMaterializedTable();
+if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == 
materializedTable.getRefreshMode()) {
+createMaterializedInContinuousMode(
+operationExecutor, handle, 
createMaterializedTableOperation);
+} else {
+throw new SqlExecutionException(
+"Only support create materialized table in continuous 
refresh mode currently.");

Review Comment:
   The wording here is a bit strange.  How about change it to .  `Currently, we 
only support creating materialized tables with continuous refresh mode.`



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java:
##
@@

[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-05-07 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844187#comment-17844187
 ] 

Ryan Skraba commented on FLINK-35002:
-

* 1.19 Java 8 / E2E (group 2) 
https://github.com/apache/flink/commit/fa426f104baa1343a07695dcf4c4984814f0fde4/checks/24659542455/logs

> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-07 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844185#comment-17844185
 ] 

Ryan Skraba commented on FLINK-35041:
-

* 1.20 test_cron_jdk21 core 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59356&view=logs&j=d06b80b4-9e88-5d40-12a2-18072cf60528&t=609ecd5a-3f6e-5d0c-2239-2096b155a4d0&l=8870

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35302) Flink REST server throws exception on unknown fields in RequestBody

2024-05-07 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-35302:
---

Assignee: Juntao Hu

> Flink REST server throws exception on unknown fields in RequestBody
> ---
>
> Key: FLINK-35302
> URL: https://issues.apache.org/jira/browse/FLINK-35302
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Juntao Hu
>Assignee: Juntao Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> As 
> [FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
>  and FLINK-33268 mentioned, when an old version REST client receives response 
> from a new version REST server, with strict JSON mapper, the client will 
> throw exceptions on newly added fields, which is not convenient for 
> situations where a centralized client deals with REST servers of different 
> versions (e.g. k8s operator).
> But this incompatibility can also happens at server side, when a new version 
> REST client sends requests to an old version REST server with additional 
> fields. Making server flexible with unknown fields can save clients from 
> backward compatibility code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35030) Introduce Epoch Manager for async execution

2024-05-07 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-35030:
---
Summary: Introduce Epoch Manager for async execution  (was: Introduce Epoch 
Manager for  under async execution)

> Introduce Epoch Manager for async execution
> ---
>
> Key: FLINK-35030
> URL: https://issues.apache.org/jira/browse/FLINK-35030
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35030) Introduce Epoch Manager for async execution

2024-05-07 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-35030:
--

Assignee: Yanfei Lei

> Introduce Epoch Manager for async execution
> ---
>
> Key: FLINK-35030
> URL: https://issues.apache.org/jira/browse/FLINK-35030
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-07 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2097757324

   Thanks for @aiwenmo's kindly review, addressed comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35303) Support logical deletion of datax

2024-05-07 Thread melin (Jira)
melin created FLINK-35303:
-

 Summary:  Support logical deletion of datax
 Key: FLINK-35303
 URL: https://issues.apache.org/jira/browse/FLINK-35303
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: melin


delete event is logical deletion. Add a field to the table. For example: 
is_delete, the default is false, if it is a delete event, is_delete is set to 
true。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35303) Support logical deletion of data

2024-05-07 Thread melin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

melin updated FLINK-35303:
--
Summary:  Support logical deletion of data  (was:  Support logical deletion 
of datax)

>  Support logical deletion of data
> -
>
> Key: FLINK-35303
> URL: https://issues.apache.org/jira/browse/FLINK-35303
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> delete event is logical deletion. Add a field to the table. For example: 
> is_delete, the default is false, if it is a delete event, is_delete is set to 
> true。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20628] Port RabbitMQ Connector using unified Source [flink-connector-rabbitmq]

2024-05-07 Thread via GitHub


RocMarshal closed pull request #16: [FLINK-20628] Port RabbitMQ Connector using 
unified Source
URL: https://github.com/apache/flink-connector-rabbitmq/pull/16


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20628] Port RabbitMQ Connector using unified Source [flink-connector-rabbitmq]

2024-05-07 Thread via GitHub


RocMarshal commented on PR #16:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/16#issuecomment-2097772513

   @vahmed-hamdy will take over it(With discussion offline).
   Thanks @vahmed-hamdy & @MartijnVisser ~ 
   Looking forward to it !
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-07 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844215#comment-17844215
 ] 

Rui Fan commented on FLINK-35041:
-

I have asked [~Feifan Wang] offline, he will check&fix it this week with high 
priority, thanks Feifan in advance.

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]

2024-05-07 Thread via GitHub


wzorgdrager commented on code in PR #24756:
URL: https://github.com/apache/flink/pull/24756#discussion_r1592097458


##
flink-python/pyflink/common/types.py:
##
@@ -177,7 +177,10 @@ def of_kind(row_kind: RowKind, *args, **kwargs):
 return row
 
 def __contains__(self, item):
-return item in self._values
+if hasattr(self, "_fields"):
+return item in self._fields

Review Comment:
   so then the question is, if `_fields` is None do we want to fallback and do 
a membership check for the values or always just return False?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35304) Mongo ITCase fails due to duplicate records after resuming

2024-05-07 Thread yux (Jira)
yux created FLINK-35304:
---

 Summary: Mongo ITCase fails due to duplicate records after resuming
 Key: FLINK-35304
 URL: https://issues.apache.org/jira/browse/FLINK-35304
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Test case testRemoveAndAddCollectionsOneByOne keeps failing since downstream 
receives duplicate data rows after MongoDB token resume.

2024-05-07T08:57:16.4720998Z [ERROR] Tests run: 20, Failures: 1, Errors: 0, 
Skipped: 0, Time elapsed: 498.203 s <<< FAILURE! - in 
org.apache.flink.cdc.connectors.mongodb.source.NewlyAddedTableITCase
2024-05-07T08:57:16.4723517Z [ERROR] 
org.apache.flink.cdc.connectors.mongodb.source.NewlyAddedTableITCase.testRemoveAndAddCollectionsOneByOne
  Time elapsed: 38.114 s  <<< FAILURE!
2024-05-07T08:57:16.4725419Z java.lang.AssertionError: expected:<33> but 
was:<34>
2024-05-07T08:57:16.4726168Z     at org.junit.Assert.fail(Assert.java:89)
2024-05-07T08:57:16.4726828Z     at 
org.junit.Assert.failNotEquals(Assert.java:835)
2024-05-07T08:57:16.4727540Z     at 
org.junit.Assert.assertEquals(Assert.java:647)
2024-05-07T08:57:16.4728301Z     at 
org.junit.Assert.assertEquals(Assert.java:633)
2024-05-07T08:57:16.4729698Z     at 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInOrder(MongoDBAssertUtils.java:118)
2024-05-07T08:57:16.4731863Z     at 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder(MongoDBAssertUtils.java:111)
2024-05-07T08:57:16.4734296Z     at 
org.apache.flink.cdc.connectors.mongodb.source.NewlyAddedTableITCase.testRemoveAndAddCollectionsOneByOne(NewlyAddedTableITCase.java:501)
2024-05-07T08:57:16.4736882Z     at 
org.apache.flink.cdc.connectors.mongodb.source.NewlyAddedTableITCase.testRemoveAndAddCollectionsOneByOne(NewlyAddedTableITCase.java:330)
2024-05-07T08:57:16.4738847Z     at 
java.lang.reflect.Method.invoke(Method.java:498)
2024-05-07T08:57:16.4739923Z     at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-05-07T08:57:16.4740790Z     at java.lang.Thread.run(Thread.java:750)
2024-05-07T08:57:16.4741257Z



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]

2024-05-07 Thread via GitHub


Zakelly closed pull request #24640: [FLINK-32080][FLIP-306][checkpoint] 
Restoration of FileMergingSnapshotManager
URL: https://github.com/apache/flink/pull/24640


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32080) Restoration of FileMergingSnapshotManager

2024-05-07 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844223#comment-17844223
 ] 

Zakelly Lan commented on FLINK-32080:
-

Merged via afe4c79efa15902369d41ef5a6e73d79a2e7d525

> Restoration of FileMergingSnapshotManager
> -
>
> Key: FLINK-32080
> URL: https://issues.apache.org/jira/browse/FLINK-32080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32080) Restoration of FileMergingSnapshotManager

2024-05-07 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-32080.
-
Resolution: Fixed

> Restoration of FileMergingSnapshotManager
> -
>
> Key: FLINK-32080
> URL: https://issues.apache.org/jira/browse/FLINK-32080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35303) Support logical deletion of data

2024-05-07 Thread LvYanquan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844224#comment-17844224
 ] 

LvYanquan commented on FLINK-35303:
---

Currently, we can add a metadata column from row_kind 
[https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#available-metadata]
 to represent that. 
However, for a delete event, we will still delete this record in the 
downstream, we need to change the RowKind of those events to Insert to achieve 
the demand.

>  Support logical deletion of data
> -
>
> Key: FLINK-35303
> URL: https://issues.apache.org/jira/browse/FLINK-35303
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>
> delete event is logical deletion. Add a field to the table. For example: 
> is_delete, the default is false, if it is a delete event, is_delete is set to 
> true。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]

2024-05-07 Thread via GitHub


masteryhx commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1592119008


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java:
##
@@ -114,6 +116,10 @@ public void setKeyedStateStore(@Nullable KeyedStateStore 
keyedStateStore) {
 this.keyedStateStore = keyedStateStore;
 }
 
+public void setAsyncKeyedStateStore(@Nullable AsyncKeyedStateStore 
asyncKeyedStateStore) {

Review Comment:
   Thanks for the suggestion.
   Renamed to `KeyedStateStoreV2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: api [flink]

2024-05-07 Thread via GitHub


Jiabao-Sun merged PR #24603:
URL: https://github.com/apache/flink/pull/24603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25537) [JUnit5 Migration] Module: flink-core

2024-05-07 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844226#comment-17844226
 ] 

Jiabao Sun commented on FLINK-25537:


master: ffa3869c48a68c1dd3126fa949adc6953979711f

> [JUnit5 Migration] Module: flink-core
> -
>
> Key: FLINK-25537
> URL: https://issues.apache.org/jira/browse/FLINK-25537
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Reporter: Qingsheng Ren
>Assignee: Aiden Gong
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-05-07 Thread via GitHub


masteryhx commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1592121657


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -31,75 +30,50 @@
 /**
  * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
  * calling the Get API multiple times with multiple threads.
- *
- * @param  The type of key in get access request.
- * @param  The type of value in get access request.
  */
-public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+public class ForStGeneralMultiGetOperation implements ForStDBOperation {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
 
 private final RocksDB db;
 
-private final List> batchRequest;
+private final List> batchRequest;
 
 private final Executor executor;
 
 ForStGeneralMultiGetOperation(
-RocksDB db, List> batchRequest, Executor 
executor) {
+RocksDB db, List> batchRequest, Executor 
executor) {
 this.db = db;
 this.batchRequest = batchRequest;
 this.executor = executor;
 }
 
 @Override
-public CompletableFuture> process() {
+public CompletableFuture process() {
 
-CompletableFuture> future = new CompletableFuture<>();
-@SuppressWarnings("unchecked")
-V[] result = (V[]) new Object[batchRequest.size()];
-Arrays.fill(result, null);
+CompletableFuture future = new CompletableFuture<>();
 
 AtomicInteger counter = new AtomicInteger(batchRequest.size());
 for (int i = 0; i < batchRequest.size(); i++) {
-GetRequest request = batchRequest.get(i);
-final int index = i;
+ForStDBGetRequest request = batchRequest.get(i);
 executor.execute(
 () -> {
 try {
-ForStInnerTable table = request.table;
-byte[] key = table.serializeKey(request.key);
-byte[] value = 
db.get(table.getColumnFamilyHandle(), key);
-if (value != null) {
-result[index] = table.deserializeValue(value);
-}
+byte[] key = request.buildSerializedKey();
+byte[] value = 
db.get(request.getColumnFamilyHandle(), key);
+request.completeStateFuture(value);
 } catch (Exception e) {
 LOG.warn(
 "Error when process general multiGet 
operation for forStDB", e);
 future.completeExceptionally(e);

Review Comment:
   Okay, I think it's reasonble to consider it together with handling mechanism 
of AEC later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1592128250


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -203,12 +251,29 @@ public void callbackFinished() {
 }
 
 @Override
-public void thenSyncAccept(Consumer action) {
-completableFuture.thenAccept(action);
+public void thenSyncAccept(ThrowingConsumer action) {
+completableFuture
+.thenAccept(ThrowingConsumer.unchecked(action))
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when processing 
completed StateFuture's callback.",
+e);
+return null;
+});
 }
 
 /** The entry for a state future to submit task to mailbox. */
 public interface CallbackRunner {
-void submit(Runnable task);
+void submit(ThrowingRunnable task);
+}
+
+/**
+ * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+ * AsyncExceptionHandler}.
+ */
+public interface CallbackExceptionHandler {

Review Comment:
   Thanks for the suggestion, renamed and squashed commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-07 Thread via GitHub


dawidwys commented on PR #24365:
URL: https://github.com/apache/flink/pull/24365#issuecomment-2097889690

   > Now we use splitByWholeSeparatorPreserveAllTokens, when delimiter is 
empty, it will return entire string.
   
   Actually it splits by whitespaces: 
https://github.com/apache/flink/blob/8e5220b288e49c99333a4bc8ef7e3d5d27193921/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java#L107
   
   > I think we can change the description,
   
   Let's first agree what is the desired behaviour.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35172]DDL statement is added to the Schema Change Event [flink-cdc]

2024-05-07 Thread via GitHub


melin closed pull request #3245: [FLINK-35172]DDL statement is added to the 
Schema Change Event
URL: https://github.com/apache/flink-cdc/pull/3245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35172]DDL statement is added to the Schema Change Event [flink-cdc]

2024-05-07 Thread via GitHub


melin commented on PR #3245:
URL: https://github.com/apache/flink-cdc/pull/3245#issuecomment-2097893968

   /


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1592131320


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncStateException.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+/**
+ * An exception for wrapping exceptions that are thrown by {@link
+ * org.apache.flink.api.common.state.v2.StateFuture} callback framework.
+ */
+public class AsyncStateException extends RuntimeException {
+private static final long serialVersionUID = 1L;
+
+public AsyncStateException(Throwable cause) {
+super(cause);
+}
+
+public AsyncStateException(String message, Throwable cause) {
+super(message, cause);
+}
+
+@Override
+public String toString() {
+return "StateCallbackException{" + getCause() + "}";

Review Comment:
   nit: StateCallbackException -> AsyncStateException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


LadyForest commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1591751966


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##
@@ -102,6 +103,23 @@
 /** Test cases for the DDL statements for {@link 
SqlNodeToOperationConversion}. */
 public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
+@Test
+public void testAlterCatalog() {
+// test alter catalog options
+final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 
'k2' = 'v2_new')";
+Operation operation = parse(sql1);
+assertThat(operation).isInstanceOf(AlterCatalogOptionsOperation.class);
+assertThat(((AlterCatalogOptionsOperation) 
operation).getCatalogName()).isEqualTo("cat2");
+assertThat(operation.asSummaryString())
+.isEqualTo("ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' 
= 'v2_new'");
+
+final Map expectedOptions = new HashMap<>();
+expectedOptions.put("K1", "V1");
+expectedOptions.put("k2", "v2_new");
+assertThat(((AlterCatalogOptionsOperation) operation).getProperties())
+.isEqualTo(expectedOptions);

Review Comment:
   Nit: can be simplified as
   
   ```java
   // test alter catalog options
   final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 
'v2', 'k2' = 'v2_new')";
   final Map expectedOptions = new HashMap<>();
   expectedOptions.put("K1", "V1");
   expectedOptions.put("k2", "v2_new");
   
   Operation operation = parse(sql1);
   assertThat(operation)
   .isInstanceOf(AlterCatalogOptionsOperation.class)
   .asInstanceOf(type(AlterCatalogOptionsOperation.class))
   .extracting(
   AlterCatalogOptionsOperation::getCatalogName,
   AlterCatalogOptionsOperation::asSummaryString,
   AlterCatalogOptionsOperation::getProperties)
   .containsExactly(
   "cat2",
   "ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' 
= 'v2_new'",
   expectedOptions);
   ```



##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -147,6 +147,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
 }
 }
 
+/**
+* Parses a alter catalog statement.

Review Comment:
   Nit: an alter catalog statement



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName, 
CatalogDescriptor catalogDescripto
 catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
 }
 
+/**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the 
given catalog
+ * @param catalogDescriptor catalog descriptor for altering catalog
+ * @throws CatalogException If the catalog neither exists in the catalog 
store nor in the
+ * initialized catalogs, or if an error occurs while creating the 
catalog or storing the
+ * {@link CatalogDescriptor}
+ */
+public void alterCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+throws CatalogException {
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(catalogName),
+"Catalog name cannot be null or empty.");
+checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+CatalogStore catalogStore = catalogStoreHolder.catalogStore();
+Optional oldCatalogDescriptor = 
getCatalogDescriptor(catalogName);
+if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
+Map props = 
oldCatalogDescriptor.get().getConfiguration().toMap();

Review Comment:
   Nit: no need to convert to map back and forth.
   
   ```java
   Configuration conf = oldCatalogDescriptor.get().getConfiguration();
   conf.addAll(catalogDescriptor.getConfiguration());
   CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, 
conf);
   ```



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName, 
CatalogDescriptor catalogDescripto
 catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
 }
 
+/**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the 
given catalog
+ * @param catalogDescriptor catalog descriptor for altering cata

Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-07 Thread via GitHub


Jiabao-Sun commented on code in PR #24670:
URL: https://github.com/apache/flink/pull/24670#discussion_r1592131889


##
flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java:
##
@@ -17,48 +17,48 @@
 
 package org.apache.flink.util;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link CloseableIterator} test. */
 @SuppressWarnings("unchecked")
-public class CloseableIteratorTest {
+class CloseableIteratorTest {
 
 private static final String[] ELEMENTS = new String[] {"element-1", 
"element-2"};
 
 @Test
-public void testFlattenEmpty() throws Exception {
+void testFlattenEmpty() throws Exception {
 List> iterators =
 asList(
 CloseableIterator.flatten(),
 CloseableIterator.flatten(CloseableIterator.empty()),
 
CloseableIterator.flatten(CloseableIterator.flatten()));
 for (CloseableIterator i : iterators) {
-assertFalse(i.hasNext());
+assertThat(i.hasNext()).isFalse();

Review Comment:
   ```suggestion
   assertThat(i).isExhausted();
   ```



##
flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java:
##
@@ -20,44 +20,40 @@
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.InputStream;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** This class contains tests for the {@link org.apache.flink.util.AbstractID} 
class. */
-public class AbstractIDTest extends TestLogger {
+class AbstractIDTest {
 
 /** Tests the serialization/deserialization of an abstract ID. */
 @Test
-public void testSerialization() throws Exception {
+void testSerialization() throws Exception {
 final AbstractID origID = new AbstractID();
 final AbstractID copyID = 
CommonTestUtils.createCopySerializable(origID);
 
-assertEquals(origID.hashCode(), copyID.hashCode());
-assertEquals(origID, copyID);
+assertThat(copyID.hashCode()).isEqualTo(origID.hashCode());

Review Comment:
   ```suggestion
   assertThat(copyID).hasSameHashCodeAs(origID);
   ```



##
flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java:
##
@@ -99,39 +97,40 @@ private String createProxyDefinition(String proxyName) {
 }
 
 @Test
-public void testInstantiationOfStringValue() {
+void testInstantiationOfStringValue() {
 StringValue stringValue = 
InstantiationUtil.instantiate(StringValue.class, null);
-assertNotNull(stringValue);
+assertThat(Optional.of(stringValue)).isNotNull();

Review Comment:
   ```java
   Object stringValue = 
InstantiationUtil.instantiate(StringValue.class, null);
   assertThat(stringValue).isNotNull();
   ```



##
flink-core/src/test/java/org/apache/flink/util/CollectionUtilTest.java:
##
@@ -138,15 +129,12 @@ public void testCheckedSubTypeCast() {
 list.add(null);
 Collection castSuccess = CollectionUtil.checkedSubTypeCast(list, 
B.class);
 Iterator iterator = castSuccess.iterator();
-Assertions.assertEquals(b, iterator.next());
-Assertions.assertEquals(c, iterator.next());
-Assertions.assertNull(iterator.next());
-Assertions.assertFalse(iterator.hasNext());
-try {
-Collection castFail = CollectionUtil.checkedSubTypeCast(list, 
C.class);
-fail("Expected ClassCastException");
-} catch (ClassCastException expected) {
-}
+assertThat(iterator.next()).isEqualTo(b);
+assertThat(iterator.next()).isEqualTo(c);
+assertThat(iterator.next()).isNull();
+assertThat(iterator.hasNext()).isFalse();

Review Comment:
   ```suggestion
   assertThat(iterator).isExhausted();
   ```



##
flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java:
##
@@ -572,8 +566,6 @@ public void testExpandDirWithForbiddenEscape() {
  * @param outputFile the path of the output file
  * @param length the size of content to generate
  * @return MD5 of the output file
- * @throws IOException
- * @throws NoSuchAlgorithmException
  */
 private static String generateTestFile(String outputFile, int length)

Review Comment:
   We don't have to delete it just becau

Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-05-07 Thread via GitHub


Zakelly closed pull request #24739: [FLINK-35161][state] Implement 
StateExecutor for ForStStateBackend
URL: https://github.com/apache/flink/pull/24739


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35161) Implement StateExecutor for ForStStateBackend

2024-05-07 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844232#comment-17844232
 ] 

Zakelly Lan commented on FLINK-35161:
-

Merged via ea4112aefa72e9d15525a72157ced3e3da3650ff

> Implement StateExecutor for ForStStateBackend
> -
>
> Key: FLINK-35161
> URL: https://issues.apache.org/jira/browse/FLINK-35161
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35161) Implement StateExecutor for ForStStateBackend

2024-05-07 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-35161.
-
Resolution: Fixed

> Implement StateExecutor for ForStStateBackend
> -
>
> Key: FLINK-35161
> URL: https://issues.apache.org/jira/browse/FLINK-35161
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


liyubin117 commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1592191396


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName, 
CatalogDescriptor catalogDescripto
 catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
 }
 
+/**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the 
given catalog
+ * @param catalogDescriptor catalog descriptor for altering catalog
+ * @throws CatalogException If the catalog neither exists in the catalog 
store nor in the
+ * initialized catalogs, or if an error occurs while creating the 
catalog or storing the
+ * {@link CatalogDescriptor}
+ */
+public void alterCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+throws CatalogException {
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(catalogName),
+"Catalog name cannot be null or empty.");
+checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+CatalogStore catalogStore = catalogStoreHolder.catalogStore();
+Optional oldCatalogDescriptor = 
getCatalogDescriptor(catalogName);
+if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
+Map props = 
oldCatalogDescriptor.get().getConfiguration().toMap();
+props.putAll(catalogDescriptor.getConfiguration().toMap());
+CatalogDescriptor newCatalogDescriptor =
+CatalogDescriptor.of(catalogName, 
Configuration.fromMap(props));
+Catalog catalog = initCatalog(catalogName, newCatalogDescriptor);

Review Comment:
   Good idea :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33462) Sort out the document page about the new Jdbc source.

2024-05-07 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844238#comment-17844238
 ] 

RocMarshal commented on FLINK-33462:


Hi, [~martijnvisser] 
There's a minor item to confirm, should we  synchronize the files about 
doc-pages to the docs-folder into the Flink main repository ? or just keep the 
new change in the flink-connector-jdbc repo ?
I'd appreciated with your confirmation~ :)

> Sort out the document page about the new Jdbc source.
> -
>
> Key: FLINK-33462
> URL: https://issues.apache.org/jira/browse/FLINK-33462
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


liyubin117 commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1592237072


##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -147,6 +147,28 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
 }
 }
 
+/**
+* Parses a alter catalog statement.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


liyubin117 commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1592242102


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##
@@ -102,6 +103,23 @@
 /** Test cases for the DDL statements for {@link 
SqlNodeToOperationConversion}. */
 public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
+@Test
+public void testAlterCatalog() {
+// test alter catalog options
+final String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 
'k2' = 'v2_new')";
+Operation operation = parse(sql1);
+assertThat(operation).isInstanceOf(AlterCatalogOptionsOperation.class);
+assertThat(((AlterCatalogOptionsOperation) 
operation).getCatalogName()).isEqualTo("cat2");
+assertThat(operation.asSummaryString())
+.isEqualTo("ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' 
= 'v2_new'");
+
+final Map expectedOptions = new HashMap<>();
+expectedOptions.put("K1", "V1");
+expectedOptions.put("k2", "v2_new");
+assertThat(((AlterCatalogOptionsOperation) operation).getProperties())
+.isEqualTo(expectedOptions);

Review Comment:
   Got it ! it is more concise now :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


liyubin117 commented on code in PR #24735:
URL: https://github.com/apache/flink/pull/24735#discussion_r1592243049


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -321,6 +322,42 @@ public void createCatalog(String catalogName, 
CatalogDescriptor catalogDescripto
 catalogStoreHolder.catalogStore().storeCatalog(catalogName, 
catalogDescriptor);
 }
 
+/**
+ * Alters a catalog under the given name. The catalog name must be unique.
+ *
+ * @param catalogName the given catalog name under which to alter the 
given catalog
+ * @param catalogDescriptor catalog descriptor for altering catalog
+ * @throws CatalogException If the catalog neither exists in the catalog 
store nor in the
+ * initialized catalogs, or if an error occurs while creating the 
catalog or storing the
+ * {@link CatalogDescriptor}
+ */
+public void alterCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor)
+throws CatalogException {
+checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(catalogName),
+"Catalog name cannot be null or empty.");
+checkNotNull(catalogDescriptor, "Catalog descriptor cannot be null");
+CatalogStore catalogStore = catalogStoreHolder.catalogStore();
+Optional oldCatalogDescriptor = 
getCatalogDescriptor(catalogName);
+if (catalogStore.contains(catalogName) && 
oldCatalogDescriptor.isPresent()) {
+Map props = 
oldCatalogDescriptor.get().getConfiguration().toMap();

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33463][Connector/JDBC] Support the implementation of dynamic source tables based on the new source [flink-connector-jdbc]

2024-05-07 Thread via GitHub


eskabetxe commented on code in PR #117:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/117#discussion_r1592285942


##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -144,7 +146,8 @@ void testJdbcReadProperties() {
 readOptions,
 LookupOptions.MAX_RETRIES.defaultValue(),
 null,
-SCHEMA.toPhysicalRowDataType());
+SCHEMA.toPhysicalRowDataType(),
+FactoryMocks.IDENTIFIER.asSummaryString());

Review Comment:
   could we test in some way the source naming?



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -179,13 +205,34 @@ public ScanRuntimeProvider 
getScanRuntimeProvider(ScanContext runtimeProviderCon
 
 LOG.debug("Query generated for JDBC scan: " + query);
 
-builder.setQuery(query);
+builder.setSql(query);
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
-builder.setRowConverter(dialect.getRowConverter(rowType));
-builder.setRowDataTypeInfo(
-
runtimeProviderContext.createTypeInformation(physicalRowDataType));
+builder.setResultExtractor(new 
RowDataResultExtractor(dialect.getRowConverter(rowType)));
+
builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType));
+options.getProperties()
+.forEach(
+(key, value) ->
+builder.setConnectionProperty(key.toString(), 
value.toString()));
+JdbcSource source = builder.build();
+return new DataStreamScanProvider() {

Review Comment:
   should we create a static class (inside this file) for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] FLINK-35299: Add logic to respect initial position for new streams [flink-connector-aws]

2024-05-07 Thread via GitHub


antssilva96 opened a new pull request, #140:
URL: https://github.com/apache/flink-connector-aws/pull/140

   
   
   ## Purpose of the change
   
   According to the javadoc, the `STREAM_INITIAL_POSITION` property defines 
where to start reading Kinesis streams from. However, in the current 
implementation this is only true if there isn't any restore state at all for 
any streams for that KinesisConsumer, otherwise the new stream is handled the 
same way a new shard for and existing stream is: start consuming from EARLIEST 
(same as TRIM_HORIZON initial position).
   
   This MR changes that by making `FlinkKinesisConsumer` to use 
`STREAM_INITIAL_POSITION` config for new streams, which aligns with that is 
documented.
   
   This behavior is disabled by default to not introduce a breaking change, but 
can be enabled by setting `flink.stream.initpos-for-new-streams` to true.
   
   Additionally, a second config was created  - `flink.stream.initpos-streams` 
- to allow specific streams to be "reset" to whatever the 
STREAM_INITIAL_POSITION is defined. This is an important addition in this MR 
because users who notice this bug and want to enable the correct behaviour will 
want to reset the now recorded offset for the new stream.
   
   
   ## Verifying this change
   
   **TODO**
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   - *Added integration tests for end-to-end deployment*
   - *Added unit tests*
   - *Manually verified by running the Kinesis connector on a local Flink 
cluster.*
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [x] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented) docs and JavaDocs where appropiate
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] FLINK-35299: Add logic to respect initial position for new streams [flink-connector-aws]

2024-05-07 Thread via GitHub


boring-cyborg[bot] commented on PR #140:
URL: 
https://github.com/apache/flink-connector-aws/pull/140#issuecomment-2098181150

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35232] Add retry settings for GCS connector [flink]

2024-05-07 Thread via GitHub


xintongsong commented on code in PR #24753:
URL: https://github.com/apache/flink/pull/24753#discussion_r1592308269


##
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java:
##
@@ -80,6 +84,74 @@ public class GSFileSystemOptions {
 .withDescription(
 "This option sets the timeout in milliseconds to 
read data from an established connection.");
 
+/**
+ * Flink config option to set the http read timeout. It will be used by 
cloud-storage library.
+ */
+public static final ConfigOption GCS_RETRY_MAX_ATTEMPT =
+ConfigOptions.key("gs.retry.max-attempt")
+.intType()
+.defaultValue(6)

Review Comment:
   I'm not sure about setting default values for these options. The default 
values are ignored anyway because these options are only used in 
`getOptional()`, which returns `empty()` if the option is not explicitly 
configured by user. We can mention in the description that if not configured, 
the GCS default values will be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35299) FlinkKinesisConsumer does not respect StreamInitialPosition for new Kinesis Stream when restoring from snapshot

2024-05-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35299:
---
Labels: pull-request-available  (was: )

> FlinkKinesisConsumer does not respect StreamInitialPosition for new Kinesis 
> Stream when restoring from snapshot
> ---
>
> Key: FLINK-35299
> URL: https://issues.apache.org/jira/browse/FLINK-35299
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0
>Reporter: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
>
> h3. What
> The FlinkKinesisConsumer allows users to read from [multiple Kinesis 
> Streams|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L224].
> Users can also specify a STREAM_INITIAL_POSITION, which configures if the 
> consumer starts reading the stream from TRIM_HORIZON / LATEST / AT_TIMESTAMP.
> When restoring the Kinesis Consumer from an existing snapshot, users can 
> configure the consumer to read from additional Kinesis Streams. The expected 
> behavior would be for the FlinkKinesisConsumer to start reading from the 
> additional Kinesis Streams respecting the STREAM_INITIAL_POSITION 
> configuration. However, we find that it currently reads from TRIM_HORIZON.
> This is surprising behavior and should be corrected.
> h3. Why
> Principle of Least Astonishment
> h3. How
> We recommend that we reconstruct the previously seen streams by iterating 
> through the [sequenceNumsStateForCheckpoint in 
> FlinkKinesisConsumer#initializeState()|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L454].
> h3. Risks
> This might increase the state restore time. We can consider adding a feature 
> flag for users to turn this check off.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]

2024-05-07 Thread via GitHub


lsyldliu merged PR #24750:
URL: https://github.com/apache/flink/pull/24750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35195) Support the execution of create materialized table in continuous refresh mode

2024-05-07 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844266#comment-17844266
 ] 

dalongliu commented on FLINK-35195:
---

Merged in master: 29736b8c01924b7da03d4bcbfd9c812a8e5a08b4

> Support the execution of create materialized table in continuous refresh mode
> -
>
> Key: FLINK-35195
> URL: https://issues.apache.org/jira/browse/FLINK-35195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support creates materialized table and its 
> background refresh job:
> {code:SQL}
> CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
>  
> [ ([  ]) ]
>  
> [COMMENT table_comment]
>  
> [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
>  
> [WITH (key1=val1, key2=val2, ...)]
>  
> FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY }
>  
> [REFRESH_MODE = { CONTINUOUS | FULL }]
>  
> AS 
>  
> :
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35195) Support the execution of create materialized table in continuous refresh mode

2024-05-07 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu resolved FLINK-35195.
---
Resolution: Fixed

> Support the execution of create materialized table in continuous refresh mode
> -
>
> Key: FLINK-35195
> URL: https://issues.apache.org/jira/browse/FLINK-35195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support creates materialized table and its 
> background refresh job:
> {code:SQL}
> CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
>  
> [ ([  ]) ]
>  
> [COMMENT table_comment]
>  
> [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
>  
> [WITH (key1=val1, key2=val2, ...)]
>  
> FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY }
>  
> [REFRESH_MODE = { CONTINUOUS | FULL }]
>  
> AS 
>  
> :
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Flink module 1 [flink-training]

2024-05-07 Thread via GitHub


Grub3rMS opened a new pull request, #77:
URL: https://github.com/apache/flink-training/pull/77

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35193][table] Support the execution of refresh materialized table [flink]

2024-05-07 Thread via GitHub


xuyangzhong opened a new pull request, #24760:
URL: https://github.com/apache/flink/pull/24760

   ## What is the purpose of the change
   
   Support the execution of refresh materialized table.
   
   ## Brief change log
   
 - *Add execution in sql gateway*
 - *Add operation in table-api&table-planner*
 - *Add tests*
   
   
   ## Verifying this change
   
   Some tests are added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode

2024-05-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35193:
---
Labels: pull-request-available  (was: )

> Support drop materialized table syntax and execution in continuous refresh 
> mode
> ---
>
> Key: FLINK-35193
> URL: https://issues.apache.org/jira/browse/FLINK-35193
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support drop materialized table and the 
> background refresh job.
> {code:SQL}
> DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34916][table] Support `ALTER CATALOG SET` syntax [flink]

2024-05-07 Thread via GitHub


liyubin117 commented on PR #24735:
URL: https://github.com/apache/flink/pull/24735#issuecomment-2098565516

   @LadyForest Hi, Thanks for your review, I have updated as you said :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35193][table] Support the execution of refresh materialized table [flink]

2024-05-07 Thread via GitHub


flinkbot commented on PR #24760:
URL: https://github.com/apache/flink/pull/24760#issuecomment-2098577523

   
   ## CI report:
   
   * 7e2b5f7bc2a6137445c6fbca41fe705d96599932 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core [flink]

2024-05-07 Thread via GitHub


GOODBOY008 closed pull request #24523: [FLINK-25537] [JUnit5 Migration] Module: 
flink-core
URL: https://github.com/apache/flink/pull/24523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Flink module 1 [flink-training]

2024-05-07 Thread via GitHub


Grub3rMS closed pull request #77: Flink module 1
URL: https://github.com/apache/flink-training/pull/77


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35299] Respect initial position for new streams [flink-connector-aws]

2024-05-07 Thread via GitHub


antssilva96 commented on code in PR #140:
URL: 
https://github.com/apache/flink-connector-aws/pull/140#discussion_r1592721925


##
flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java:
##
@@ -320,12 +320,35 @@ public enum EFORegistrationType {
 public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS =
 "flink.stream.efo.http-client.read-timeout";
 
+/**
+ * Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be 
considered for new
+ * streams, when the app is already consuming from other streams.
+ */
+public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS =
+"flink.stream.initpos-for-new-streams";
+
+/**
+ * Property that can be used to ignore the restore state for a particular 
stream and instead use
+ * the initial position. This is useful to reset a specific stream to 
consume from TRIM_HORIZON
+ * or LATEST if needed. Values must be passed in a comma separated list.
+ *
+ * If a stream is in this list, it will use initial position regardless 
of the value of the
+ * {@link #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property.
+ */
+public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO =
+"flink.stream.initpos-streams";
+

Review Comment:
   not really sure about these property names... any suggestions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33699) Verify the snapshot migration on Java21

2024-05-07 Thread Alexander Filipchik (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844372#comment-17844372
 ] 

Alexander Filipchik commented on FLINK-33699:
-

hey folks, is anyone actively working on this one? 

> Verify the snapshot migration on Java21
> ---
>
> Key: FLINK-33699
> URL: https://issues.apache.org/jira/browse/FLINK-33699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Priority: Major
>
> In Java 21 builds, Scala is being bumped to 2.12.18, which causes 
> incompatibilities within Flink.
> This could affect loading savepoints from a Java 8/11/17 build. We already 
> have tests extending {{SnapshotMigrationTestBase}} to verify the logic of 
> migrating snapshots generated by the older Flink version. I think we can also 
> introduce similar tests to verify the logic across different Java versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

2024-05-07 Thread via GitHub


klam-shop commented on PR #24482:
URL: https://github.com/apache/flink/pull/24482#issuecomment-2099270127

   👋 Thanks for working on this Anupam. Any updates on this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Lab 1 - Filter New York only rides [flink-training]

2024-05-07 Thread via GitHub


manoellins opened a new pull request, #78:
URL: https://github.com/apache/flink-training/pull/78

   Logic to filter and return New York only rides


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Lab 1 - Filter New York only rides [flink-training]

2024-05-07 Thread via GitHub


manoellins closed pull request #78: Lab 1 - Filter New York only rides
URL: https://github.com/apache/flink-training/pull/78


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35305) FLIP-438: Amazon SQS Sink Connector

2024-05-07 Thread Priya Dhingra (Jira)
Priya Dhingra created FLINK-35305:
-

 Summary: FLIP-438: Amazon SQS Sink Connector
 Key: FLINK-35305
 URL: https://issues.apache.org/jira/browse/FLINK-35305
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / AWS
Reporter: Priya Dhingra


This is an umbrella task for FLIP-438. FLIP-438: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Lab 1 with filtering and enrichment [flink-training]

2024-05-07 Thread via GitHub


manoellins opened a new pull request, #79:
URL: https://github.com/apache/flink-training/pull/79

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-07 Thread via GitHub


Jiabao-Sun commented on code in PR #24670:
URL: https://github.com/apache/flink/pull/24670#discussion_r1593260020


##
flink-core/src/test/java/org/apache/flink/util/LinkedOptionalMapTest.java:
##
@@ -124,42 +118,50 @@ public void mergingToEmpty() {
 
 first.putAll(second);
 
-assertThat(first.keyNames(), contains("a", "b", "c", "d"));
+assertThat(first.keyNames()).contains("a", "b", "c", "d");
 }
 
-@Test(expected = IllegalStateException.class)
-public void unwrapOptionalsWithMissingValueThrows() {
-LinkedOptionalMap, String> map = new LinkedOptionalMap<>();
+@Test
+void unwrapOptionalsWithMissingValueThrows() {
+assertThatThrownBy(
+() -> {
+LinkedOptionalMap, String> map = new 
LinkedOptionalMap<>();
 
-map.put("a", String.class, null);
+map.put("a", String.class, null);
 
-map.unwrapOptionals();
+map.unwrapOptionals();
+})
+.isInstanceOf(IllegalStateException.class);
 }
 
-@Test(expected = IllegalStateException.class)
-public void unwrapOptionalsWithMissingKeyThrows() {
-LinkedOptionalMap, String> map = new LinkedOptionalMap<>();
+@Test
+void unwrapOptionalsWithMissingKeyThrows() {
+assertThatThrownBy(
+() -> {
+LinkedOptionalMap, String> map = new 
LinkedOptionalMap<>();
 
-map.put("a", null, "blabla");
+map.put("a", null, "blabla");
 
-map.unwrapOptionals();
+map.unwrapOptionals();
+})
+.isInstanceOf(IllegalStateException.class);
 }
 
 @Test
-public void unwrapOptionalsPreservesOrder() {
+void unwrapOptionalsPreservesOrder() {
 LinkedOptionalMap, String> map = new LinkedOptionalMap<>();
 
 map.put("a", String.class, "aaa");
 map.put("b", Boolean.class, "bbb");
 
 LinkedHashMap, String> m = map.unwrapOptionals();
 
-assertThat(m.keySet(), contains(String.class, Boolean.class));
-assertThat(m.values(), contains("aaa", "bbb"));
+assertThat(m).containsKey(String.class);

Review Comment:
   ```suggestion
   assertThat(m).containsKeys(String.class, Boolean.class);
   ```



##
flink-core/src/test/java/org/apache/flink/util/concurrent/ConjunctFutureTest.java:
##
@@ -37,43 +37,41 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for the {@link ConjunctFuture} and its sub classes. */
-@RunWith(Parameterized.class)
-public class ConjunctFutureTest extends TestLogger {
+/** Tests for the {@link ConjunctFuture} and its subclasses. */
+@ExtendWith(ParameterizedTestExtension.class)
+class ConjunctFutureTest {
 
-@Parameterized.Parameters
+@Parameters
 public static Collection parameters() {

Review Comment:
   ```suggestion
   private static Collection parameters() {
   ```



##
flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java:
##
@@ -20,62 +20,57 @@
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link SerializedValue}. */
-public class SerializedValueTest {
+class SerializedValueTest {
 
 @Test
-public void testSimpleValue() {
-try {
-final String value = "teststring";
-
-SerializedValue v = new SerializedValue<>(value);
-SerializedValue copy = 
CommonTestUtils.createCopySerializable(v);
-
-assertEquals(value, 
v.deserializeValue(getClass().getClassLoader()));
-assertEquals(value, 
copy.deserializeValue(getClass().getClassLoader()));
-
-assertEquals(v, copy);
-assertEquals(v.hashCode(), copy.hashCode());
-
-assertNotNull(v.toString());
-assertNotNull(copy.toString());
-
-assertNotEquals(0, v.getByteArray().length);
-assertArrayEquals(v.getByteArray()

Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


fredia commented on PR #24698:
URL: https://github.com/apache/flink/pull/24698#issuecomment-2099597032

   Thanks for the detailed review, rebased to master.  Will merge after CI 
green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]

2024-05-07 Thread via GitHub


dianfu commented on code in PR #24756:
URL: https://github.com/apache/flink/pull/24756#discussion_r1593302200


##
flink-python/pyflink/common/types.py:
##
@@ -177,7 +177,10 @@ def of_kind(row_kind: RowKind, *args, **kwargs):
 return row
 
 def __contains__(self, item):
-return item in self._values
+if hasattr(self, "_fields"):
+return item in self._fields

Review Comment:
   @wzorgdrager Thanks a lot for the PR. Personally I prefer to fallback to 
check for the values if `_fields` is None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]

2024-05-07 Thread via GitHub


loserwang1024 commented on code in PR #3255:
URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1593302893


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java:
##
@@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) {
 }
 
 static String convertToString(Object o) {
-if (o.getClass() == String.class) {
+if (o == null) {

Review Comment:
   you are right, just done it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35281][hotfix][cdc-common] FlinkEnvironmentUtils#addJar add each jar only once [flink-cdc]

2024-05-07 Thread via GitHub


loserwang1024 opened a new pull request, #3301:
URL: https://github.com/apache/flink-cdc/pull/3301

   Current org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils#addJar 
will be invoked for each source and sink.
   
   ```java
   public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
   try {
   Class envClass = 
StreamExecutionEnvironment.class;
   Field field = envClass.getDeclaredField("configuration");
   field.setAccessible(true);
   Configuration configuration = ((Configuration) field.get(env));
   List jars =
   configuration.getOptional(PipelineOptions.JARS).orElse(new 
ArrayList<>());
   jars.add(jarUrl.toString());
   configuration.set(PipelineOptions.JARS, jars);
   } catch (Exception e) {
   throw new RuntimeException("Failed to add JAR to Flink execution 
environment", e);
   } 
   
   ```
   if multiple source or sink share same jar, the par path will be added 
repeatly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35281][hotfix][cdc-common] FlinkEnvironmentUtils#addJar add each jar only once [flink-cdc]

2024-05-07 Thread via GitHub


loserwang1024 commented on PR #3301:
URL: https://github.com/apache/flink-cdc/pull/3301#issuecomment-2099626220

   @yuxiqian , @PatrickRen , CC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35281) FlinkEnvironmentUtils#addJar add each jar only once

2024-05-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35281:
---
Labels: pull-request-available  (was: )

> FlinkEnvironmentUtils#addJar add each jar only once
> ---
>
> Key: FLINK-35281
> URL: https://issues.apache.org/jira/browse/FLINK-35281
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils#addJar will 
> be invoked for each source and sink.
> {code:java}
> public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
> try {
> Class envClass = 
> StreamExecutionEnvironment.class;
> Field field = envClass.getDeclaredField("configuration");
> field.setAccessible(true);
> Configuration configuration = ((Configuration) field.get(env));
> List jars =
> configuration.getOptional(PipelineOptions.JARS).orElse(new 
> ArrayList<>());
> jars.add(jarUrl.toString());
> configuration.set(PipelineOptions.JARS, jars);
> } catch (Exception e) {
> throw new RuntimeException("Failed to add JAR to Flink execution 
> environment", e);
> } {code}
> if multiple source or sink share same jar, the par path will be added 
> repeatly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34666) Keep assigned splits in order to fix wrong meta group calculation

2024-05-07 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844495#comment-17844495
 ] 

Hongshun Wang commented on FLINK-34666:
---

Have done it in FLINK-34634, please close it. [~renqs] 

> Keep assigned splits in order to fix wrong meta group calculation
> -
>
> Key: FLINK-34666
> URL: https://issues.apache.org/jira/browse/FLINK-34666
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.0
>
>
> h3. Reason
> When added newly tables, and then restart job, 
> IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> 
> SplitAssigner#getFinishedSplitInfos maybe return unordered  
> finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader 
> request newly table's infos, will get older one, then never read all the 
> infos and restart changelog read.
>  
> h3. How to reproduced it?
> Add  chunk-meta.group.size = 2 in 
> getCreateTableStatement, then run test of 
> org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
> {code:java}
> //代码占位符
> private String getCreateTableStatement(
> Map otherOptions, String... captureTableNames) {
> return String.format(
> "CREATE TABLE address ("
> + " table_name STRING METADATA VIRTUAL,"
> + " id BIGINT NOT NULL,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (id) not enforced"
> + ") WITH ("
> + " 'connector' = 'postgres-cdc',"
> + " 'scan.incremental.snapshot.enabled' = 'true',"
> + " 'hostname' = '%s',"
> + " 'port' = '%s',"
> + " 'username' = '%s',"
> + " 'password' = '%s',"
> + " 'database-name' = '%s',"
> + " 'schema-name' = '%s',"
> + " 'table-name' = '%s',"
> + " 'slot.name' = '%s', "
> + " 'scan.incremental.snapshot.chunk.size' = '2',"
> + " 'chunk-meta.group.size' = '2',"
> + " 'scan.newly-added-table.enabled' = 'true'"
> + " %s"
> + ")",
> customDatabase.getHost(),
> customDatabase.getDatabasePort(),
> customDatabase.getUsername(),
> customDatabase.getPassword(),
> customDatabase.getDatabaseName(),
> SCHEMA_NAME,
> PostgresTestUtils.getTableNameRegex(captureTableNames),
> slotName,
> otherOptions.isEmpty()
> ? ""
> : ","
> + otherOptions.entrySet().stream()
> .map(
> e ->
> String.format(
> "'%s'='%s'",
> e.getKey(), 
> e.getValue()))
> .collect(Collectors.joining(",")));
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35093:
--
Fix Version/s: cdc-3.2.0

> Postgres source connector support SPECIFIC_OFFSETS start up mode from an 
> existed replication slot.
> --
>
> Key: FLINK-35093
> URL: https://issues.apache.org/jira/browse/FLINK-35093
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Current, Postgres source connector  only support INITIAL and LATEST mode.
> However, sometimes, user want to restart from an existed replication slot's 
> confiermed_lsn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35121) CDC pipeline connector should verify requiredOptions and optionalOptions

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35121:
--
Affects Version/s: cdc-3.1.0

> CDC pipeline connector should verify requiredOptions and optionalOptions
> 
>
> Key: FLINK-35121
> URL: https://issues.apache.org/jira/browse/FLINK-35121
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
>
> At present, though we provide 
> org.apache.flink.cdc.common.factories.Factory#requiredOptions and 
> org.apache.flink.cdc.common.factories.Factory#optionalOptions, but both are 
> not used anywhere. This means not verifying requiredOptions and 
> optionalOptions.
> Thus, like what DynamicTableFactory does, provide 
> FactoryHelper to help verify requiredOptions and optionalOptions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35067) Support metadata 'op_type' virtual column for Postgres CDC Connector.

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35067:
--
Affects Version/s: cdc-3.1.0

>  Support metadata 'op_type' virtual column for Postgres CDC Connector. 
> ---
>
> Key: FLINK-35067
> URL: https://issues.apache.org/jira/browse/FLINK-35067
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
>
> Like [https://github.com/apache/flink-cdc/pull/2913,] Support metadata 
> 'op_type' virtual column for Postgres CDC Connector. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35121) CDC pipeline connector should verify requiredOptions and optionalOptions

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35121:
--
Fix Version/s: cdc-3.2.0

> CDC pipeline connector should verify requiredOptions and optionalOptions
> 
>
> Key: FLINK-35121
> URL: https://issues.apache.org/jira/browse/FLINK-35121
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> At present, though we provide 
> org.apache.flink.cdc.common.factories.Factory#requiredOptions and 
> org.apache.flink.cdc.common.factories.Factory#optionalOptions, but both are 
> not used anywhere. This means not verifying requiredOptions and 
> optionalOptions.
> Thus, like what DynamicTableFactory does, provide 
> FactoryHelper to help verify requiredOptions and optionalOptions.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35093:
--
Affects Version/s: cdc-3.1.0

> Postgres source connector support SPECIFIC_OFFSETS start up mode from an 
> existed replication slot.
> --
>
> Key: FLINK-35093
> URL: https://issues.apache.org/jira/browse/FLINK-35093
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>
> Current, Postgres source connector  only support INITIAL and LATEST mode.
> However, sometimes, user want to restart from an existed replication slot's 
> confiermed_lsn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34688) CDC framework split snapshot chunks asynchronously

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-34688:
--
Fix Version/s: cdc-3.2.0
   (was: cdc-3.1.0)
Affects Version/s: cdc-3.1.0

> CDC framework split snapshot chunks asynchronously
> --
>
> Key: FLINK-34688
> URL: https://issues.apache.org/jira/browse/FLINK-34688
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> In Mysql CDC,  MysqlSnapshotSplitAssigner splits snapshot chunks 
> asynchronously([https://github.com/apache/flink-cdc/pull/931).] But CDC 
> framework lacks it.
> If table is too big to split, the enumerator will be stuck, and checkpoint 
> will be influenced( sometime will checkpoint timeout occurs).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35067) Support metadata 'op_type' virtual column for Postgres CDC Connector.

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35067:
--
Fix Version/s: cdc-3.2.0

>  Support metadata 'op_type' virtual column for Postgres CDC Connector. 
> ---
>
> Key: FLINK-35067
> URL: https://issues.apache.org/jira/browse/FLINK-35067
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Like [https://github.com/apache/flink-cdc/pull/2913,] Support metadata 
> 'op_type' virtual column for Postgres CDC Connector. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35129) Postgres source commits the offset after every multiple checkpoint cycles.

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35129:
--
Fix Version/s: cdc-3.2.0
Affects Version/s: cdc-3.1.0

> Postgres source commits the offset after every multiple checkpoint cycles.
> --
>
> Key: FLINK-35129
> URL: https://issues.apache.org/jira/browse/FLINK-35129
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Muhammet Orazov
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> After entering the Stream phase, the offset consumed by the global slot is 
> committed upon the completion of each checkpoint, preventing log files from 
> being unable to be recycled continuously, which could lead to insufficient 
> disk space.
> However, the job can only restart from the latest checkpoint or savepoint. if 
> restored from an earlier state, WAL may already have been recycled.
>  
> The way to solve it is to commit the offset after every multiple checkpoint 
> cycles. The number of checkpoint cycles is determine by connector option, and 
> the default value is 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35143) Expose newly added tables capture in mysql pipeline connector

2024-05-07 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35143:
--
Fix Version/s: cdc-3.2.0
Affects Version/s: cdc-3.1.0

> Expose newly added tables capture in mysql pipeline connector
> -
>
> Key: FLINK-35143
> URL: https://issues.apache.org/jira/browse/FLINK-35143
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Currently, mysql pipeline connector still don't allowed to capture newly 
> added tables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35215][core] Fix the bug when Kryo serialize length is 0 [flink]

2024-05-07 Thread via GitHub


1996fanrui commented on code in PR #24717:
URL: https://github.com/apache/flink/pull/24717#discussion_r1579104254


##
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java:
##
@@ -65,6 +65,10 @@ public boolean canReadLong() throws KryoException {
  */
 @Override
 protected int require(int required) throws KryoException {
+if (required == 0) {
+return 0;
+}

Review Comment:
   Hi @qinghui-xu , I'm not sure could this change fix the bug that FLINK-34954 
mentioned. Would you mind helping double check? Thank you in advance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35215][core] Fix the bug when Kryo serialize length is 0 [flink]

2024-05-07 Thread via GitHub


1996fanrui commented on code in PR #24717:
URL: https://github.com/apache/flink/pull/24717#discussion_r1593316335


##
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java:
##
@@ -114,18 +117,26 @@ public void readBytes(byte[] bytes, int offset, int 
count) throws KryoException
 throw new IllegalArgumentException("bytes cannot be null.");
 }
 
+if (count == 0) {
+return;
+}
+

Review Comment:
   This `if (count == 0) {` is alternative change for FLINK-34954, I ran the 
demo that provided in FLINK-34954, it works well. So I think the change is fine.
   
   Also, I ran the benchmark, the performance of serializerKryo and 
serializerKryoWithoutRegistration are recovered.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35215][core] Fix the bug when Kryo serialize length is 0 [flink]

2024-05-07 Thread via GitHub


1996fanrui commented on code in PR #24717:
URL: https://github.com/apache/flink/pull/24717#discussion_r1593318508


##
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java:
##
@@ -114,18 +117,26 @@ public void readBytes(byte[] bytes, int offset, int 
count) throws KryoException
 throw new IllegalArgumentException("bytes cannot be null.");
 }
 
+if (count == 0) {
+return;
+}
+

Review Comment:
   Hi @dannycranmer , this issue is the blocker of Flink 1.20, and you are the 
reviewer of https://github.com/apache/flink/pull/24586, would you mind 
reviewing this PR as well? thanks in advance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-05-07 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844504#comment-17844504
 ] 

Rui Fan commented on FLINK-35215:
-

Thanks [~kkrugler] for the feedback!
{quote} * I was surprised that Case2 (above) didn't cause a test to fail, until 
I realized that the previous fix hadn't added a test for failure with 0 
serialized bytes. I would recommend adding this, so that your changes don't 
accidentally re-introduce this bug.{quote}
Good catch, I will add test later. Also, would you mind helping review the new 
PR in your free time? thanks a lot.

 
{quote} * I still am suspicious of the change in performance. The initial fix 
changes a while(true) loop to one that has a simple comparison, and inside the 
loop there are calls to methods that are going to be doing significant work. So 
I really don't see how that change could have caused a significant performance 
regression, unless I'm missing something.{quote}
See this comment: 
https://github.com/apache/flink/pull/24717#discussion_r1579063045

 

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-25-14-57-55-231.png, 
> image-2024-04-25-15-00-32-410.png
>
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerKryo&extr=on&quarts=on&equid=off&env=3&revs=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerKryoWithoutRegistration&extr=on&quarts=on&equid=off&env=3&revs=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-07 Thread via GitHub


Jiabao-Sun commented on code in PR #24670:
URL: https://github.com/apache/flink/pull/24670#discussion_r1593325219


##
flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java:
##
@@ -20,62 +20,57 @@
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link SerializedValue}. */
-public class SerializedValueTest {
+class SerializedValueTest {
 
 @Test
-public void testSimpleValue() {
-try {
-final String value = "teststring";
-
-SerializedValue v = new SerializedValue<>(value);
-SerializedValue copy = 
CommonTestUtils.createCopySerializable(v);
-
-assertEquals(value, 
v.deserializeValue(getClass().getClassLoader()));
-assertEquals(value, 
copy.deserializeValue(getClass().getClassLoader()));
-
-assertEquals(v, copy);
-assertEquals(v.hashCode(), copy.hashCode());
-
-assertNotNull(v.toString());
-assertNotNull(copy.toString());
-
-assertNotEquals(0, v.getByteArray().length);
-assertArrayEquals(v.getByteArray(), copy.getByteArray());
-
-byte[] bytes = v.getByteArray();
-SerializedValue saved =
-SerializedValue.fromBytes(Arrays.copyOf(bytes, 
bytes.length));
-assertEquals(v, saved);
-assertArrayEquals(v.getByteArray(), saved.getByteArray());
-} catch (Exception e) {
-e.printStackTrace();
-fail(e.getMessage());
-}
+void testSimpleValue() throws Exception {
+final String value = "teststring";
+
+SerializedValue v = new SerializedValue<>(value);
+SerializedValue copy = 
CommonTestUtils.createCopySerializable(v);
+
+
assertThat(v.deserializeValue(getClass().getClassLoader())).isEqualTo(value);
+
assertThat(copy.deserializeValue(getClass().getClassLoader())).isEqualTo(value);
+
+assertThat(copy).isEqualTo(v);
+assertThat(copy).hasSameHashCodeAs(v.hashCode());

Review Comment:
   ```suggestion
   assertThat(copy).hasSameHashCodeAs(v);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]

2024-05-07 Thread via GitHub


AidenPerce commented on PR #2968:
URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2099656171

   @yuxiqian may help to merge it please ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34666) Keep assigned splits in order to fix wrong meta group calculation

2024-05-07 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren closed FLINK-34666.
-
Resolution: Fixed

> Keep assigned splits in order to fix wrong meta group calculation
> -
>
> Key: FLINK-34666
> URL: https://issues.apache.org/jira/browse/FLINK-34666
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.0
>
>
> h3. Reason
> When added newly tables, and then restart job, 
> IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> 
> SplitAssigner#getFinishedSplitInfos maybe return unordered  
> finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader 
> request newly table's infos, will get older one, then never read all the 
> infos and restart changelog read.
>  
> h3. How to reproduced it?
> Add  chunk-meta.group.size = 2 in 
> getCreateTableStatement, then run test of 
> org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
> {code:java}
> //代码占位符
> private String getCreateTableStatement(
> Map otherOptions, String... captureTableNames) {
> return String.format(
> "CREATE TABLE address ("
> + " table_name STRING METADATA VIRTUAL,"
> + " id BIGINT NOT NULL,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (id) not enforced"
> + ") WITH ("
> + " 'connector' = 'postgres-cdc',"
> + " 'scan.incremental.snapshot.enabled' = 'true',"
> + " 'hostname' = '%s',"
> + " 'port' = '%s',"
> + " 'username' = '%s',"
> + " 'password' = '%s',"
> + " 'database-name' = '%s',"
> + " 'schema-name' = '%s',"
> + " 'table-name' = '%s',"
> + " 'slot.name' = '%s', "
> + " 'scan.incremental.snapshot.chunk.size' = '2',"
> + " 'chunk-meta.group.size' = '2',"
> + " 'scan.newly-added-table.enabled' = 'true'"
> + " %s"
> + ")",
> customDatabase.getHost(),
> customDatabase.getDatabasePort(),
> customDatabase.getUsername(),
> customDatabase.getPassword(),
> customDatabase.getDatabaseName(),
> SCHEMA_NAME,
> PostgresTestUtils.getTableNameRegex(captureTableNames),
> slotName,
> otherOptions.isEmpty()
> ? ""
> : ","
> + otherOptions.entrySet().stream()
> .map(
> e ->
> String.format(
> "'%s'='%s'",
> e.getKey(), 
> e.getValue()))
> .collect(Collectors.joining(",")));
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]

2024-05-07 Thread via GitHub


Zakelly commented on code in PR #24748:
URL: https://github.com/apache/flink/pull/24748#discussion_r1593310127


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ParallelEpochManager.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+/** An implementation of {@link AbstractEpochManager} for parallel epoch 
execution. */
+public class ParallelEpochManager extends AbstractEpochManager {
+
+public ParallelEpochManager() {
+super();
+}
+
+@Override
+public void onNonRecord(Runnable action) {
+Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);
+lastEpoch.action = action;
+lastEpoch.close();
+if (outputQueue.size() == 1) { // which means the first epoch
+if (lastEpoch.tryFinish()) {
+outputQueue.remove(0);
+}
+}
+Epoch epoch = new Epoch(0, action);

Review Comment:
   This action should not be passed into the new epoch.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractEpochManager.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import javax.annotation.Nonnull;
+
+import java.util.LinkedList;
+
+/**
+ * Epoch manager segments inputs into distinct epochs, marked by the arrival 
of non-records(e.g.
+ * watermark, record attributes). Records are assigned to a unique epoch based 
on their arrival,
+ * records within an epoch are allowed to be parallelized, while the 
non-record of an epoch can only
+ * be executed when all records in this epoch have finished.
+ *
+ * For more details please refer to FLIP-425.
+ */
+public abstract class AbstractEpochManager {
+
+/**
+ * This enum defines whether parallel execution between epochs is allowed. 
We should keep this
+ * internal and away from API module for now, until we could see the 
concrete need for {@link
+ * #PARALLEL_BETWEEN_EPOCH} from average users.
+ */
+public enum ParallelMode {
+/**
+ * Subsequent epochs must wait until the previous epoch is completed 
before they can start.
+ */
+SERIAL_BETWEEN_EPOCH,
+/**
+ * Subsequent epochs can begin execution even if the previous epoch 
has not yet completed.
+ * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
+ */
+PARALLEL_BETWEEN_EPOCH
+}
+
+/** The output queue to hold ongoing epochs. */
+protected LinkedList outputQueue;
+
+public AbstractEpochManager() {
+this.outputQueue = new LinkedList<>();
+// preset an empty epoch, the epoch action will be updated when 
non-record is received.
+this.outputQueue.add(new Epoch(0, () -> {}));
+}
+
+/**
+ * Add a record to the current epoch and return the current open epoch, 
the epoch will be
+ * associated with the {@link RecordContext} of this record. Must be 
invoked within task thread.
+ *
+ * @return the current open epoch.
+ */
+public Epoch onRecord() {
+Epoch lastEpoch = outputQueue.get(outputQueue.size() - 1);

Review Comment:
   How about hold a reference of `Epoch activeEpoch` instead of get the last 
one from queue?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing

[jira] [Assigned] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-05-07 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-35215:
---

Assignee: Rui Fan

> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed
> -
>
> Key: FLINK-35215
> URL: https://issues.apache.org/jira/browse/FLINK-35215
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: image-2024-04-25-14-57-55-231.png, 
> image-2024-04-25-15-00-32-410.png
>
>
> The performance of serializerKryo and serializerKryoWithoutRegistration are 
> regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
> related logic.
>  
> [1] 
> [http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerKryo&extr=on&quarts=on&equid=off&env=3&revs=50]
> [2] 
> http://flink-speed.xyz/timeline/#/?exe=1,6,12&ben=serializerKryoWithoutRegistration&extr=on&quarts=on&equid=off&env=3&revs=50
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34954) Kryo input implementation NoFetchingInput fails to handle zero length bytes

2024-05-07 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844507#comment-17844507
 ] 

Rui Fan commented on FLINK-34954:
-

Hi [~q.xu] , I wanna check with you did you meet this issue in your production 
job? or it only happens for your test code.

After I read the flink related code in detail, I guess this issue never happen 
for flink job.
h1. Reason:

This exception happens inside of NoFetchingInput.readBytes, and when 
inputStream.read return -1.
h2. Why doesn't flink job throw exception?

In flink code, the inputStream is DataInputViewStream, check 
KryoSerializer#deserialize method.
{code:java}
               DataInputViewStream inputStream = new 
DataInputViewStream(source);
                input = new NoFetchingInput(inputStream);{code}
DataInputViewStream#read calls 
org.apache.flink.core.memory.DataInputView#read(byte[], int, int). From its 
comment, we can see : If len is zero, then no bytes are read 
and 0 is returned;

So when length is 0, inputStream.read will return 0. And 
NoFetchingInput.readBytes won't throw exception.
h2. Why does your test demo throw exception?

Your demo is using java.io.ByteArrayInputStream as the inputStream. From its 
comment, we can see 
{code:java}
* @return  the total number of bytes read into the buffer, or
*  -1 if there is no more data because the end of
*  the stream has been reached. {code}
It returns -1 even if length is 0.

When return value is -1,  NoFetchingInput.readBytes will throw exception.

Please correct me if anything is wrong, thanks~

> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---
>
> Key: FLINK-34954
> URL: https://issues.apache.org/jira/browse/FLINK-34954
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.0
>Reporter: Qinghui Xu
>Assignee: Qinghui Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when 
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be 
> serialized as 0 length byte array, and the deserialization later will fail. 
> Illustration:
> {noformat}
> import com.esotericsoftware.kryo.Kryo
> import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, 
> Input, Output}
> import com.google.protobuf.{DescriptorProtos, Message}import 
> com.twitter.chill.protobuf.ProtobufSerializer
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> import java.io.ByteArrayInputStream
>  
> object ProtoSerializationTest {
>   def main(args: Array[String]) = {     
> val chillProtoSerializer = new ProtobufSerializer
>     val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
>     val output: Output = new ByteBufferOutput(1000)
>     chillProtoSerializer.write(null, output, protomessage)
>     val serialized: Array[Byte] = output.toBytes
>     println(s"Serialized : $serialized")
>     val input: Input = new NoFetchingInput(new 
> ByteArrayInputStream(serialized))
>     val deserialized = chillProtoSerializer.read(null, input, 
> classOf[BillableClick].asInstanceOf[Class[Message]])
>     println(deserialized)
>   }
> }
> {noformat}
>  
> Error
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Could not create class 
> com.criteo.glup.BillableClickProto$BillableClick
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>     at 
> com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
>     at ProtoSerialization.main(ProtoSerialization.scala)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
>     at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>     ... 2 more
> Caused by: java.io.EOFException: No more bytes left.
>     ... 5 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35306) Flink cannot compile with jdk17

2024-05-07 Thread Rui Fan (Jira)
Rui Fan created FLINK-35306:
---

 Summary: Flink cannot compile with jdk17
 Key: FLINK-35306
 URL: https://issues.apache.org/jira/browse/FLINK-35306
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Rui Fan
Assignee: Rui Fan
 Fix For: 1.20.0
 Attachments: image-2024-05-08-11-48-04-161.png

!image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35306) Flink cannot compile with jdk17

2024-05-07 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-35306:

Description: 
Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
some subclasses didn't change it, such as: 
PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.

 

It belongs to flink-tests-java17 module, and it doesn't compile by default.

 

it's caused by
 * https://issues.apache.org/jira/browse/FLINK-25537
 * https://github.com/apache/flink/pull/24603

 

!image-2024-05-08-11-48-04-161.png!

  was:!image-2024-05-08-11-48-04-161.png!


> Flink cannot compile with jdk17
> ---
>
> Key: FLINK-35306
> URL: https://issues.apache.org/jira/browse/FLINK-35306
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
> Fix For: 1.20.0
>
> Attachments: image-2024-05-08-11-48-04-161.png
>
>
> Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
> schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
> some subclasses didn't change it, such as: 
> PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
>  
> It belongs to flink-tests-java17 module, and it doesn't compile by default.
>  
> it's caused by
>  * https://issues.apache.org/jira/browse/FLINK-25537
>  * https://github.com/apache/flink/pull/24603
>  
> !image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35306][test] Update some methods in flink-tests-java17 to solve jdk17 compile fails [flink]

2024-05-07 Thread via GitHub


1996fanrui opened a new pull request, #24761:
URL: https://github.com/apache/flink/pull/24761

   ## What is the purpose of the change
   
   Flink cannot compile with jdk17
   
   Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier updates the 
schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
some subclasses didn't change it, such as: 
PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
   
   It belongs to flink-tests-java17 module, and it doesn't compile by default.
   
   it's related to https://github.com/apache/flink/pull/24603
   
   
   
   ## Brief change log
   
   [FLINK-35306][test] Update some methods in flink-tests-java17 to solve jdk17 
compile fails
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35306) Flink cannot compile with jdk17

2024-05-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35306:
---
Labels: pull-request-available  (was: )

> Flink cannot compile with jdk17
> ---
>
> Key: FLINK-35306
> URL: https://issues.apache.org/jira/browse/FLINK-35306
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-05-08-11-48-04-161.png
>
>
> Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
> schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
> some subclasses didn't change it, such as: 
> PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
>  
> It belongs to flink-tests-java17 module, and it doesn't compile by default.
>  
> it's caused by
>  * https://issues.apache.org/jira/browse/FLINK-25537
>  * https://github.com/apache/flink/pull/24603
>  
> !image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35306][test] Update some methods in flink-tests-java17 to solve jdk17 compile fails [flink]

2024-05-07 Thread via GitHub


flinkbot commented on PR #24761:
URL: https://github.com/apache/flink/pull/24761#issuecomment-2099697858

   
   ## CI report:
   
   * 94df53390b48dc820d8147a0eb17f84743f7850d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35192] support jemalloc in image [flink-kubernetes-operator]

2024-05-07 Thread via GitHub


chenyuzhi459 opened a new pull request, #825:
URL: https://github.com/apache/flink-kubernetes-operator/pull/825

   
   
   ## What is the purpose of the change
   
   According to 
[FLINK-35192](https://issues.apache.org/jira/browse/FLINK-35192), the current 
operator image uses glibc as the memory allocator, which may cause memory leaks 
due to memory fragmentation.
   
   This pr introduces jemalloc as a memory allocator for the operator's image.
   
   ## Brief change log
   
   Introducing the `DISABLE_JEMALLOC` parameter to control whether the 
operator's image uses jemalloc as the memory allocator
   
   
   ## Verifying this change
   
   
   
   This change is already covered by existing  ci-tests
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented?  not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35192) Kubernetes operator oom

2024-05-07 Thread chenyuzhi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyuzhi updated FLINK-35192:
--
Attachment: screenshot-4.png

> Kubernetes operator oom
> ---
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, 
> image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, 
> image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >