Re: [PR] [FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished [flink]

2024-05-20 Thread via GitHub


flinkbot commented on PR #24817:
URL: https://github.com/apache/flink/pull/24817#issuecomment-2121766002

   
   ## CI report:
   
   * 99713100d9b0ca11356ae2628edebc7429bedb46 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35400) Rebuild FileMergingSnapshotManager in failover

2024-05-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35400:
---
Labels: pull-request-available  (was: )

> Rebuild FileMergingSnapshotManager in failover
> --
>
> Key: FLINK-35400
> URL: https://issues.apache.org/jira/browse/FLINK-35400
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{FileMergingSnapshotManager}} is released within 
> {{{}releaseJobResources{}}}, which will not be invoked during failover and 
> restore. However, the manager should be created again to clear all internal 
> states in a new job attempt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished [flink]

2024-05-20 Thread via GitHub


Zakelly opened a new pull request, #24817:
URL: https://github.com/apache/flink/pull/24817

   ## What is the purpose of the change
   
   Currently, the `FileMergingSnapshotManager` is created for each job, only 
when the corresponding job released, the manager is released and disposed. For 
failover scenario, the tasks quit but job is still there, leading a reuse of 
`FileMergingSnapshotManager`, which violates the design of 
`FileMergingSnapshotManager`.
   
   
   ## Brief change log
   
- Make `TaskExecutorFileMergingManager` record reference from 
`ExecutionAttemptID` to `FileMergingSnapshotManager`, and release the reference 
when task resources release. 
   
   ## Verifying this change
   
   This change is already covered by modified existing tests, such as 
`TaskExecutorFileMergingManagerTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35395:
---
Priority: Minor  (was: Major)

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-35395.

Resolution: Fixed

master(1.20) via 5f5722fe916bafc00ec05539d6c19ac28ac1c5c4.

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35395:
---
Issue Type: Bug  (was: Improvement)

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-35395.
--

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-35395:
--

Assignee: bradley

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Major
>  Labels: pull-request-available
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]

2024-05-20 Thread via GitHub


reswqa commented on PR #24808:
URL: https://github.com/apache/flink/pull/24808#issuecomment-2121760418

   Thanks @usberkeley  , merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35395) Fix KeyedStateStore class annotation error

2024-05-20 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35395:
---
Fix Version/s: 1.20.0

> Fix KeyedStateStore class annotation error
> --
>
> Key: FLINK-35395
> URL: https://issues.apache.org/jira/browse/FLINK-35395
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: bradley
>Assignee: bradley
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The KeyedStateStore class is annotated incorrectly, and the examples there 
> are obviously wrong and will mislead users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]

2024-05-20 Thread via GitHub


reswqa merged PR #24808:
URL: https://github.com/apache/flink/pull/24808


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-05-20 Thread via GitHub


whhe commented on PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#issuecomment-2121749049

   > hello,When can this feature be merged into version 3.2?
   
   I will add test cases for this PR in the next two days, and after that we 
can ask the committers to review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35405) Add buffer size/in-flight records metrics for AsyncExecutionController

2024-05-20 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35405:
--

 Summary: Add buffer size/in-flight records metrics for 
AsyncExecutionController
 Key: FLINK-35405
 URL: https://issues.apache.org/jira/browse/FLINK-35405
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35030) Introduce Epoch Manager for async execution

2024-05-20 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35030.

Resolution: Resolved

> Introduce Epoch Manager for async execution
> ---
>
> Key: FLINK-35030
> URL: https://issues.apache.org/jira/browse/FLINK-35030
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35030) Introduce Epoch Manager for async execution

2024-05-20 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848041#comment-17848041
 ] 

Yanfei Lei commented on FLINK-35030:


Merged into master via f1ecb9e

> Introduce Epoch Manager for async execution
> ---
>
> Key: FLINK-35030
> URL: https://issues.apache.org/jira/browse/FLINK-35030
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]

2024-05-20 Thread via GitHub


flinkbot commented on PR #24816:
URL: https://github.com/apache/flink/pull/24816#issuecomment-2121707146

   
   ## CI report:
   
   * 6a92726158cb6a53b1fb0960a242d66973d9d447 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-20 Thread via GitHub


mohitjain2504 commented on code in PR #24365:
URL: https://github.com/apache/flink/pull/24365#discussion_r1607603000


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
+@Internal
+public class SplitFunction extends BuiltInScalarFunction {
+public SplitFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.SPLIT, context);
+}
+
+public @Nullable ArrayData eval(@Nullable StringData string, @Nullable 
StringData delimiter) {
+try {
+if (string == null || delimiter == null) {
+return null;
+}
+if (delimiter.toString().isEmpty()) {
+List res = new ArrayList<>();
+for (int i = 0; i < string.toString().length(); ++i) {

Review Comment:
   can convert the for loop into foreach loop for better readability
   ```
   String str = string.toString();
   for (char c : str.toCharArray()) {
   res.add(StringData.fromString(String.valueOf(c)));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-20 Thread via GitHub


mohitjain2504 commented on code in PR #24365:
URL: https://github.com/apache/flink/pull/24365#discussion_r1607597612


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
+@Internal
+public class SplitFunction extends BuiltInScalarFunction {
+public SplitFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.SPLIT, context);
+}
+
+public @Nullable ArrayData eval(@Nullable StringData string, @Nullable 
StringData delimiter) {
+try {
+if (string == null || delimiter == null) {
+return null;
+}
+if (delimiter.toString().isEmpty()) {
+List res = new ArrayList<>();
+for (int i = 0; i < string.toString().length(); ++i) {
+
res.add(StringData.fromString(String.valueOf(string.toString().charAt(i;
+}
+return new GenericArrayData(res.toArray());
+}
+BinaryStringData[] binaryStringData =

Review Comment:
   Just a thought: Do we need a try-catch block over the whole function or just 
capturing the below statement will be enough? If we don't expect any exception 
in the above lines, can keep the statements out of try-catch block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-20 Thread via GitHub


mohitjain2504 commented on code in PR #24365:
URL: https://github.com/apache/flink/pull/24365#discussion_r1607592570


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
+@Internal
+public class SplitFunction extends BuiltInScalarFunction {
+public SplitFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.SPLIT, context);
+}
+
+public @Nullable ArrayData eval(@Nullable StringData string, @Nullable 
StringData delimiter) {
+try {
+if (string == null || delimiter == null) {
+return null;
+}
+if (delimiter.toString().isEmpty()) {
+List res = new ArrayList<>();

Review Comment:
   It can be initialised directly to str.length()
   ```
   String str = string.toString();
   List res = new ArrayList<>(str.length());
   ```
   This will also help us reduce the redundant calls to `string.toString()`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35037][table-planner]Optimize uniqueKeys and upsertKeys inference of windows with ROW_NUMBER. [flink]

2024-05-20 Thread via GitHub


libenchao commented on code in PR #24638:
URL: https://github.com/apache/flink/pull/24638#discussion_r1607579249


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala:
##
@@ -188,7 +188,7 @@ class FlinkRelMdUpsertKeysTest extends 
FlinkRelMdHandlerTestBase {
   rank => assertEquals(toBitSet(Array(0)), mq.getUpsertKeys(rank).toSet)
 }
 
-Array(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber)
+Array(logicalWindow, logicalRowNumber, flinkLogicalRowNumber, 
streamRowNumber)

Review Comment:
   I'm wondering why the existing tests seem already have some ability to infer 
`row_number` as a part of unique key?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,28 +186,34 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
-getUpsertKeysOnOver(rel, mq, rel.groups.map(_.keys): _*)
+getUpsertKeysOnOver(rel, mq)
   }
 
   def getUpsertKeys(
   rel: BatchPhysicalOverAggregate,
   mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
-getUpsertKeysOnOver(rel, mq, ImmutableBitSet.of(rel.partitionKeyIndices: 
_*))
+getUpsertKeysOnOver(rel, mq)
   }
 
   def getUpsertKeys(
   rel: StreamPhysicalOverAggregate,
   mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
-getUpsertKeysOnOver(rel, mq, rel.logicWindow.groups.map(_.keys): _*)
+getUpsertKeysOnOver(rel, mq)
   }
 
   private def getUpsertKeysOnOver(
-  rel: SingleRel,
-  mq: RelMetadataQuery,
-  distributionKeys: ImmutableBitSet*): JSet[ImmutableBitSet] = {
-var inputKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput)
-for (distributionKey <- distributionKeys) {
+  window: SingleRel,
+  mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
+var (groups, aggStartPos) = 
FlinkRelMdUniqueKeys.INSTANCE.getGroupsAndStartPos(window)
+var inputKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(window.getInput)
+for (group <- groups) {
+  val distributionKey = group.keys
   inputKeys = filterKeys(inputKeys, distributionKey)
+  FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, 
aggStartPos) match {
+case Some(upsertKeys) => inputKeys.addAll(upsertKeys)
+case _ =>
+  }
+  aggStartPos = aggStartPos + group.aggCalls.length

Review Comment:
   filtering the input keys, and adding the keys inferred via 'partition key 
and row_number' is orthogonal, can we just use two different for loops, which 
is more clear?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala:
##
@@ -439,6 +439,18 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
   mq: RelMetadataQuery,
   columns: ImmutableBitSet,
   ignoreNulls: Boolean): JBoolean = {
+var (groups, aggStartPos) = 
FlinkRelMdUniqueKeys.INSTANCE.getGroupsAndStartPos(overAgg)
+for (group <- groups) {
+  FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfWindowGroup(group, 
aggStartPos) match {
+case Some(upsertKeys) =>

Review Comment:
   the name should be `uniqueKeys`?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##
@@ -455,11 +455,56 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
 getUniqueKeysOfOverAgg(rel, mq, ignoreNulls)
   }
 
+  def getUniqueKeysOfWindowGroup(
+  group: Window.Group,
+  startPos: Int): Option[JSet[ImmutableBitSet]] = {
+val retSet = new JHashSet[ImmutableBitSet]
+val aggCalls = group.aggCalls
+for ((aggCall, offset) <- aggCalls.zipWithIndex) {
+  // If it's a ROW_NUMBER window, then the unique keys are partition by 
key and row number.
+  if (aggCall.getOperator.equals(SqlStdOperatorTable.ROW_NUMBER)) {
+val rowNumberColumnIndex = startPos + offset
+retSet.add(group.keys.union(ImmutableBitSet.of(rowNumberColumnIndex)))
+  }
+}
+if (retSet.isEmpty) {
+  None
+} else {
+  Some(retSet)
+}
+  }
+
+  def getGroupsAndStartPos(window: SingleRel): Tuple2[JList[Window.Group], 
Int] = {
+val groups: JList[Window.Group] = window match {
+  case window: Window => window.groups
+  case streamOverAggregate: StreamPhysicalOverAggregate =>
+streamOverAggregate.logicWindow.groups
+  case batchOverAggregate: BatchPhysicalOverAggregate => 
batchOverAggregate.windowGroups
+  case _ => throw new IllegalArgumentException("Illegal window type.")
+}
+val aggCounts = groups.map(_.aggCalls.length).sum
+val aggStartIndex = window.getRowType.getFieldCount - aggCounts
+   

Re: [PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]

2024-05-20 Thread via GitHub


zoltar9264 commented on PR #24816:
URL: https://github.com/apache/flink/pull/24816#issuecomment-2121678039

   Please have a check @masteryhx .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][ForStStateBackend] Create new copy for each SerializedCompositeKeyBuilder in ForStStateBackend [flink]

2024-05-20 Thread via GitHub


zoltar9264 opened a new pull request, #24816:
URL: https://github.com/apache/flink/pull/24816

   ## What is the purpose of the change
   
   This is a internal hotfix within development of ForStStateBackend (see: 
FLIP-427).
   
   ## Brief change log
   
 - As title, create new copy for each SerializedCompositeKeyBuilder in 
ForStStateBackend.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848037#comment-17848037
 ] 

Gang Yang commented on FLINK-35296:
---

Hi,[~ruanhang1993] ,Below is a screenshot of the thread dump
!image-2024-05-21-11-16-01-719.png!

 

The parallelism of my job is 2:

!image-2024-05-21-11-17-13-441.png!

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png, 
> image-2024-05-21-11-17-13-441.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]

2024-05-20 Thread via GitHub


dingxin-tech commented on PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#issuecomment-2121641714

   > I am wondering how a commercial database sink like MaxCompute to do e2e 
test?
   
   I will soon be working on creating a Docker image for a `MaxCompute 
Emulator` that launches a mocked version of MaxCompute. This will allow for 
end-to-end testing to be performed by initializing this image prior to 
regression testing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-11-17-13-441.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png, 
> image-2024-05-21-11-17-13-441.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-11-16-01-719.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png, image-2024-05-21-11-16-01-719.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848033#comment-17848033
 ] 

Gang Yang commented on FLINK-35296:
---

Hi,[~ruanhang1993] ,The following is a screenshot of my customized log
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#pollSplitRecords

!image-2024-05-21-10-51-17-196.png!

!image-2024-05-21-10-53-54-946.png!

com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader#fetch
!image-2024-05-21-10-52-02-530.png!

!image-2024-05-21-10-53-05-634.png!

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-53-54-946.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-53-05-634.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png, image-2024-05-21-10-53-05-634.png, 
> image-2024-05-21-10-53-54-946.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-52-52-827.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-52-02-530.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png, image-2024-05-21-10-52-02-530.png, 
> image-2024-05-21-10-52-52-827.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-51-17-196.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-21-10-51-08-452.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png, image-2024-05-21-10-51-08-452.png, 
> image-2024-05-21-10-51-17-196.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE

2024-05-20 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848030#comment-17848030
 ] 

Yanfei Lei edited comment on FLINK-35382 at 5/21/24 2:35 AM:
-

Merged into master via 26b149a.

 

Let's observe for a while and then close it.


was (Author: yanfei lei):
Merged into master via 26b149a.

> ChangelogCompatibilityITCase.testRestore fails with an NPE
> --
>
> Key: FLINK-35382
> URL: https://issues.apache.org/jira/browse/FLINK-35382
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Assignee: Jinzhong Li
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> * 1.20 Java 8 / Test (module: tests) 
> https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192
> It looks like there can be a [NullPointerException at this 
> line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666]
>  causing a test failure:
> {code}
> Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in 
> org.apache.flink.test.state.ChangelogCompatibilityITCase
> Error: 10:36:23 10:36:23.313 [ERROR] 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false,
>  restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, 
> allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR!
> May 16 10:36:23 java.lang.RuntimeException: 
> org.opentest4j.AssertionFailedError: Graph is in globally terminal state 
> (FAILED)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190)
> May 16 10:36:23   at java.util.Optional.ifPresent(Optional.java:159)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118)
> May 16 10:36:23   at java.lang.reflect.Method.invoke(Method.java:498)
> May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in 
> globally terminal state (FAILED)
> May 16 10:36:23   at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42)
> May 16 10:36:23   at 
> org.junit.jupiter.api.Assertions.fail(Assertions.java:150)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197)
> May 16 10:36:23   ... 4 more
> May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788)
> May 16 10:36:23   at 
> 

[jira] [Resolved] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE

2024-05-20 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35382.

  Assignee: Jinzhong Li
Resolution: Fixed

> ChangelogCompatibilityITCase.testRestore fails with an NPE
> --
>
> Key: FLINK-35382
> URL: https://issues.apache.org/jira/browse/FLINK-35382
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Assignee: Jinzhong Li
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> * 1.20 Java 8 / Test (module: tests) 
> https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192
> It looks like there can be a [NullPointerException at this 
> line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666]
>  causing a test failure:
> {code}
> Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in 
> org.apache.flink.test.state.ChangelogCompatibilityITCase
> Error: 10:36:23 10:36:23.313 [ERROR] 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false,
>  restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, 
> allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR!
> May 16 10:36:23 java.lang.RuntimeException: 
> org.opentest4j.AssertionFailedError: Graph is in globally terminal state 
> (FAILED)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190)
> May 16 10:36:23   at java.util.Optional.ifPresent(Optional.java:159)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118)
> May 16 10:36:23   at java.lang.reflect.Method.invoke(Method.java:498)
> May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in 
> globally terminal state (FAILED)
> May 16 10:36:23   at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42)
> May 16 10:36:23   at 
> org.junit.jupiter.api.Assertions.fail(Assertions.java:150)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197)
> May 16 10:36:23   ... 4 more
> May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> May 16 10:36:23   at 
> 

Re: [PR] [FLINK-31109][yarn] Support Hadoop proxy user when delegation token f… [flink]

2024-05-20 Thread via GitHub


wForget commented on PR #22009:
URL: https://github.com/apache/flink/pull/22009#issuecomment-2121596615

   I've tried specifying proxyUser using `HADOOP_PROXY_USER` and maintaining 
the delegation tokens with an external service and sending them into 
jobmanager, but I can't seem to update delegation tokens of taskmanager.
   
   Flink seems to use `DefaultDelegationTokenManager#listener` to update the 
delegation tokens of taskmanager. Can we provide a custom 
`DelegationTokenManager`?
   
   @venkata91 @gaborgsomogyi Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35382) ChangelogCompatibilityITCase.testRestore fails with an NPE

2024-05-20 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848030#comment-17848030
 ] 

Yanfei Lei commented on FLINK-35382:


Merged into master via 26b149a.

> ChangelogCompatibilityITCase.testRestore fails with an NPE
> --
>
> Key: FLINK-35382
> URL: https://issues.apache.org/jira/browse/FLINK-35382
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> * 1.20 Java 8 / Test (module: tests) 
> https://github.com/apache/flink/actions/runs/9110398985/job/25045798401#step:10:8192
> It looks like there can be a [NullPointerException at this 
> line|https://github.com/apache/flink/blob/9a5a99b1a30054268bbde36d565cbb1b81018890/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java#L666]
>  causing a test failure:
> {code}
> Error: 10:36:23 10:36:23.312 [ERROR] Tests run: 9, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 19.31 s <<< FAILURE! -- in 
> org.apache.flink.test.state.ChangelogCompatibilityITCase
> Error: 10:36:23 10:36:23.313 [ERROR] 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore[startWithChangelog=false,
>  restoreWithChangelog=true, restoreFrom=CHECKPOINT, allowStore=true, 
> allowRestore=true] -- Time elapsed: 1.492 s <<< ERROR!
> May 16 10:36:23 java.lang.RuntimeException: 
> org.opentest4j.AssertionFailedError: Graph is in globally terminal state 
> (FAILED)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:204)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.restoreAndValidate(ChangelogCompatibilityITCase.java:190)
> May 16 10:36:23   at java.util.Optional.ifPresent(Optional.java:159)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.testRestore(ChangelogCompatibilityITCase.java:118)
> May 16 10:36:23   at java.lang.reflect.Method.invoke(Method.java:498)
> May 16 10:36:23 Caused by: org.opentest4j.AssertionFailedError: Graph is in 
> globally terminal state (FAILED)
> May 16 10:36:23   at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:42)
> May 16 10:36:23   at 
> org.junit.jupiter.api.Assertions.fail(Assertions.java:150)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.lambda$waitForAllTaskRunning$3(CommonTestUtils.java:214)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:209)
> May 16 10:36:23   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning(CommonTestUtils.java:182)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.submit(ChangelogCompatibilityITCase.java:284)
> May 16 10:36:23   at 
> org.apache.flink.test.state.ChangelogCompatibilityITCase.tryRun(ChangelogCompatibilityITCase.java:197)
> May 16 10:36:23   ... 4 more
> May 16 10:36:23 Caused by: org.apache.flink.runtime.JobException: 
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> May 16 10:36:23   at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:279)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:270)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:263)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765)
> May 16 10:36:23   at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> May 16 10:36:23   at 
> 

Re: [PR] [FLINK-35382][test] Disable snapshot-file-merging in ChangelogCompabilityITCase [flink]

2024-05-20 Thread via GitHub


fredia merged PR #24813:
URL: https://github.com/apache/flink/pull/24813


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]

2024-05-20 Thread via GitHub


gong commented on PR #3286:
URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2121589464

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028
 ] 

Gang Yang edited comment on FLINK-35296 at 5/21/24 2:19 AM:


Hi,[~ruanhang1993] 

At present, I found that I am stuck 
in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close

mainly here:
if (statefulTaskContext.getBinaryLogClient() != null)

{ statefulTaskContext.getBinaryLogClient().disconnect(); }


was (Author: 清月):
 

At present, I found that I am stuck 
in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close



mainly here:
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028
 ] 

Gang Yang edited comment on FLINK-35296 at 5/21/24 2:18 AM:


 

At present, I found that I am stuck 
in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close



mainly here:
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}


was (Author: 清月):
!https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5c76d9b669a1468175eb12bc271ba88b9ac1d964c3b7cbddd8fd9558551b5fe3aec177c308ebd5304aa134dd402f30bcfbd0496da3fc6ae06de22bfffe0648d0c6f0fb69a5e40eefc547fcc4b7fabaaa44fb4c8ed7016461c?tmpCode=a51ae03b-b271-4e26-82eb-741ca6a184e9!

At present, I found that I am stuck 
in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-20 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848028#comment-17848028
 ] 

Gang Yang commented on FLINK-35296:
---

!https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5c76d9b669a1468175eb12bc271ba88b9ac1d964c3b7cbddd8fd9558551b5fe3aec177c308ebd5304aa134dd402f30bcfbd0496da3fc6ae06de22bfffe0648d0c6f0fb69a5e40eefc547fcc4b7fabaaa44fb4c8ed7016461c?tmpCode=a51ae03b-b271-4e26-82eb-741ca6a184e9!

At present, I found that I am stuck 
in:com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader#close

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix][cdc][docs] Supplementary answer to the Mysql CDC Q15 question [flink-cdc]

2024-05-20 Thread via GitHub


PONYLEE opened a new pull request, #3337:
URL: https://github.com/apache/flink-cdc/pull/3337

   Supplementary answer to the [Mysql CDC Q15] question


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35379][Checkpoint] Fix incorrect checkpoint notification handling in file merging [flink]

2024-05-20 Thread via GitHub


Zakelly merged PR #24806:
URL: https://github.com/apache/flink/pull/24806


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34456][configuration]Move all checkpoint-related options into CheckpointingOptions [flink]

2024-05-20 Thread via GitHub


spoon-lz commented on PR #24374:
URL: https://github.com/apache/flink/pull/24374#issuecomment-2121565086

   @Zakelly New code has been submitted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-20 Thread via GitHub


HuangZhenQiu commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2121563878

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is closed [flink]

2024-05-20 Thread via GitHub


masteryhx commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1607490839


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {

Review Comment:
   Thanks for the update.
   It makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35354] Support host mapping in Flink tikv cdc [flink-cdc]

2024-05-20 Thread via GitHub


Mrart opened a new pull request, #3336:
URL: https://github.com/apache/flink-cdc/pull/3336

   In tidb production environment deployment, there are usually two kinds of 
network: internal network and public network. When we use pd mode in tikv, we 
need to do network mapping, such as `spark.tispark.host_mapping` in 
https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md. So I 
think we need support `host_mapping` in our Flink tikv cdc connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35354) Support host mapping in Flink tikv cdc

2024-05-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35354:
---
Labels: pull-request-available  (was: )

> Support host mapping in Flink tikv cdc
> --
>
> Key: FLINK-35354
> URL: https://issues.apache.org/jira/browse/FLINK-35354
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0
>Reporter: ouyangwulin
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> In tidb production environment deployment, there are usually two kinds of 
> network: internal network and public network. When we use pd mode in tikv, we 
> need to do network mapping, such as `spark.tispark.host_mapping` in 
> [https://github.com/pingcap/tispark/blob/master/docs/userguide_3.0.md]. So I 
> think we need support `host_mapping` in our Flink tikv cdc connector.
>  
> Add param:
>  tikv.host_mapping:192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-25537) [JUnit5 Migration] Module: flink-core

2024-05-20 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845561#comment-17845561
 ] 

Rui Fan edited comment on FLINK-25537 at 5/21/24 1:29 AM:
--

Merged to master(1.20.0) via:
 * 5af9acba5b9fbcdf9aadf62310cd337d508158c3
 * e5398e1025ec4312bac74a8b32b98d03cb254667


was (Author: fanrui):
Merged to master(1.20.0) via: 5af9acba5b9fbcdf9aadf62310cd337d508158c3

> [JUnit5 Migration] Module: flink-core
> -
>
> Key: FLINK-25537
> URL: https://issues.apache.org/jira/browse/FLINK-25537
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Reporter: Qingsheng Ren
>Assignee: Aiden Gong
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: types [flink]

2024-05-20 Thread via GitHub


1996fanrui merged PR #24613:
URL: https://github.com/apache/flink/pull/24613


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35359][config] General Improvement to Configuration for Flink 2.0 [flink]

2024-05-20 Thread via GitHub


flinkbot commented on PR #24815:
URL: https://github.com/apache/flink/pull/24815#issuecomment-2121526672

   
   ## CI report:
   
   * 3fc215385500d8eb69d85c860e6e936ab103eae4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35359:
---
Labels: pull-request-available  (was: )

> General Improvement to Configuration for Flink 2.0
> --
>
> Key: FLINK-35359
> URL: https://issues.apache.org/jira/browse/FLINK-35359
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward version 2.0, we want to provide users with a better 
> experience with the existing configuration. In this FLIP, we outline several 
> general improvements to the current configuration:
>  * Ensure all the ConfigOptions are properly annotated
>  * Ensure all user-facing configurations are included in the documentation 
> generation process
>  * Make the existing ConfigOptions use the proper type
>  * Mark all internally used ConfigOptions with the @Internal annotation
>  
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537][JUnit5 Migration] Module: flink-core with,Package: types [flink]

2024-05-20 Thread via GitHub


1996fanrui commented on code in PR #24613:
URL: https://github.com/apache/flink/pull/24613#discussion_r1598105369


##
flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java:
##
@@ -51,24 +49,24 @@ public class BasicTypeInfoTest extends TestLogger {
 };
 
 @Test
-public void testBasicTypeInfoEquality() {
+void testBasicTypeInfoEquality() {
 for (Class clazz : classes) {
 BasicTypeInfo tpeInfo1 = BasicTypeInfo.getInfoFor(clazz);
 BasicTypeInfo tpeInfo2 = BasicTypeInfo.getInfoFor(clazz);
 
-assertEquals(tpeInfo1, tpeInfo2);
-assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+assertThat(tpeInfo2).isEqualTo(tpeInfo1);
+assertThat(tpeInfo2.hashCode()).isEqualTo(tpeInfo1.hashCode());

Review Comment:
   hasSameHashCode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35359][config] General Improvement to Configuration for Flink 2.0 [flink]

2024-05-20 Thread via GitHub


Sxnan opened a new pull request, #24815:
URL: https://github.com/apache/flink/pull/24815

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35403] Allow Skipping Invocation of Function Calls While Constant-folding [flink]

2024-05-20 Thread via GitHub


flinkbot commented on PR #24814:
URL: https://github.com/apache/flink/pull/24814#issuecomment-2121448911

   
   ## CI report:
   
   * 78c4501b833f0860ce931ef91c1efaae71d90e09 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35403) FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding

2024-05-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35403:
---
Labels: pull-request-available  (was: )

> FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding
> 
>
> Key: FLINK-35403
> URL: https://issues.apache.org/jira/browse/FLINK-35403
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alan Sheinberg
>Assignee: Alan Sheinberg
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35403] Allow Skipping Invocation of Function Calls While Constant-folding [flink]

2024-05-20 Thread via GitHub


AlanConfluent opened a new pull request, #24814:
URL: https://github.com/apache/flink/pull/24814

   
   
   ## What is the purpose of the change
   
   This PR adds a new method to `FunctionDefinition`, `supportsConstantFolding` 
which indicates of a function call should have constant expression reduction 
done during planning time.  For example, `AddOne(10)` could be reduced to `11` 
by invoking `AddOne` during planning time.  This is the default and current 
Flink functionality.  
   
   This addition allows overriding the default behavior and returning false, 
which would keep the expression intact, to be invoked at runtime instead.
   
   This functionality and motivation is covered in more detail in [FLIP 
452](https://cwiki.apache.org/confluence/display/FLINK/FLIP-452%3A+Allow+Skipping+Invocation+of+Function+Calls+While+Constant-folding).
   
   
   ## Verifying this change
   
   - Added new test cases to `ExpressionReductionRulesTest`.
 - These test that existing plans are unchanged
 - New tests show that disabling constant folding also keeps plans without 
reducing away the invocation
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
   - It shouldn't affect default behavior, which is to constant fold
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35404] Report all metrics of KafkaConsumer in new Kafka source [flink-connector-kafka]

2024-05-20 Thread via GitHub


boring-cyborg[bot] commented on PR #103:
URL: 
https://github.com/apache/flink-connector-kafka/pull/103#issuecomment-2121252403

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35404] Report all metrics of KafkaConsumer in new Kafka source [flink-connector-kafka]

2024-05-20 Thread via GitHub


hmit opened a new pull request, #103:
URL: https://github.com/apache/flink-connector-kafka/pull/103

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source

2024-05-20 Thread Harshit Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847993#comment-17847993
 ] 

Harshit Mittal edited comment on FLINK-35404 at 5/20/24 8:55 PM:
-

A better way to solve this problem is to implement interface `MetricsReporter` 
from kafka metrics using a MetricGroup from apache-flink.


was (Author: hmittal83):
A better way to solve this problem is to implement KafkaMetrics reporter 
interface using a MetricGroup from apache-flink.

> Report metrics of KafkaConsumer in new Kafka source
> ---
>
> Key: FLINK-35404
> URL: https://issues.apache.org/jira/browse/FLINK-35404
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Harshit Mittal
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> Currently new Kafka source only registers metrics that are available on 
> KafkaPartitionSplitReader initialization. However, there are metrics added 
> later in the lifecycle like consumer lag metrics that are missing from the 
> KafkaConsumer metric group.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source

2024-05-20 Thread Harshit Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Harshit Mittal updated FLINK-35404:
---
Description: 
Currently new Kafka source only registers metrics that are available on 
KafkaPartitionSplitReader initialization. However, there are metrics added 
later in the lifecycle like consumer lag metrics that are missing from the 
KafkaConsumer metric group.

 

  was:Currently Kafka new source doesn't register metrics of KafkaConsumer in 
KafkaPartitionSplitReader. These metrics should be added for debugging and 
monitoring purpose. 


> Report metrics of KafkaConsumer in new Kafka source
> ---
>
> Key: FLINK-35404
> URL: https://issues.apache.org/jira/browse/FLINK-35404
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Harshit Mittal
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> Currently new Kafka source only registers metrics that are available on 
> KafkaPartitionSplitReader initialization. However, there are metrics added 
> later in the lifecycle like consumer lag metrics that are missing from the 
> KafkaConsumer metric group.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source

2024-05-20 Thread Harshit Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847993#comment-17847993
 ] 

Harshit Mittal commented on FLINK-35404:


A better way to solve this problem is to implement KafkaMetrics reporter 
interface using a MetricGroup from apache-flink.

> Report metrics of KafkaConsumer in new Kafka source
> ---
>
> Key: FLINK-35404
> URL: https://issues.apache.org/jira/browse/FLINK-35404
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Harshit Mittal
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> Currently new Kafka source only registers metrics that are available on 
> KafkaPartitionSplitReader initialization. However, there are metrics added 
> later in the lifecycle like consumer lag metrics that are missing from the 
> KafkaConsumer metric group.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source

2024-05-20 Thread Harshit Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Harshit Mittal updated FLINK-35404:
---
Fix Version/s: (was: 1.14.0)
   (was: 1.13.2)

> Report metrics of KafkaConsumer in new Kafka source
> ---
>
> Key: FLINK-35404
> URL: https://issues.apache.org/jira/browse/FLINK-35404
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0
>Reporter: Harshit Mittal
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> Currently Kafka new source doesn't register metrics of KafkaConsumer in 
> KafkaPartitionSplitReader. These metrics should be added for debugging and 
> monitoring purpose. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35404) Report metrics of KafkaConsumer in new Kafka source

2024-05-20 Thread Harshit Mittal (Jira)
Harshit Mittal created FLINK-35404:
--

 Summary: Report metrics of KafkaConsumer in new Kafka source
 Key: FLINK-35404
 URL: https://issues.apache.org/jira/browse/FLINK-35404
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.13.0
Reporter: Harshit Mittal
Assignee: Qingsheng Ren
 Fix For: 1.14.0, 1.13.2


Currently Kafka new source doesn't register metrics of KafkaConsumer in 
KafkaPartitionSplitReader. These metrics should be added for debugging and 
monitoring purpose. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35403) FLIP-452: Allow Skipping Invocation of Function Calls While Constant-folding

2024-05-20 Thread Alan Sheinberg (Jira)
Alan Sheinberg created FLINK-35403:
--

 Summary: FLIP-452: Allow Skipping Invocation of Function Calls 
While Constant-folding
 Key: FLINK-35403
 URL: https://issues.apache.org/jira/browse/FLINK-35403
 Project: Flink
  Issue Type: Improvement
Reporter: Alan Sheinberg
Assignee: Alan Sheinberg






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on PR #24784:
URL: https://github.com/apache/flink/pull/24784#issuecomment-2120974575

   Sorry to bother you again, but I made a separate one [Pull request with 
hotfix](https://github.com/apache/flink/pull/24811)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can change a test with incomplete 
FULL, you are right.  I hope I didn't make a mistake with the mistake note last 
time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can add a test with incomplete FULL, 
you are right.  I hope I didn't make a mistake with the mistake note last time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607065866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Thanks for the analysis.  I think you can add a test with incomplete FULL, 
you are right.  I hope I didn't make a mistake with the error note last time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


ldadima commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1607061596


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   A difficult way out of the situation, but quite feasible, ok.  Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35402) [flink-operator][Deployment] add labels to metadata

2024-05-20 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luís Costa updated FLINK-35402:
---
Description: 
Greetings dear team,

I would like to add labels to flink-operator Deployment metadata - 
https://github.com/apache/flink-kubernetes-operator/pull/829

Best regards,
Luís Costa

  was:
Greetings dear team,

I would like to add labels to flink-operator Deployment metadata


> [flink-operator][Deployment] add labels to metadata
> ---
>
> Key: FLINK-35402
> URL: https://issues.apache.org/jira/browse/FLINK-35402
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: Luís Costa
>Priority: Minor
>  Labels: pull-request-available
>
> Greetings dear team,
> I would like to add labels to flink-operator Deployment metadata - 
> https://github.com/apache/flink-kubernetes-operator/pull/829
> Best regards,
> Luís Costa



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] FLINK-35402 [flink-operator][Deployment] add labels to metadata [flink-kubernetes-operator]

2024-05-20 Thread via GitHub


luismacosta opened a new pull request, #829:
URL: https://github.com/apache/flink-kubernetes-operator/pull/829

   FLINK-35402 [flink-operator][Deployment] add labels to metadata - 
https://issues.apache.org/jira/browse/FLINK-35402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35402) [flink-operator][Deployment] add labels to metadata

2024-05-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35402:
---
Labels: pull-request-available  (was: )

> [flink-operator][Deployment] add labels to metadata
> ---
>
> Key: FLINK-35402
> URL: https://issues.apache.org/jira/browse/FLINK-35402
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: Luís Costa
>Priority: Minor
>  Labels: pull-request-available
>
> Greetings dear team,
> I would like to add labels to flink-operator Deployment metadata



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35402) [flink-operator][Deployment] add labels to metadata

2024-05-20 Thread Jira
Luís Costa created FLINK-35402:
--

 Summary: [flink-operator][Deployment] add labels to metadata
 Key: FLINK-35402
 URL: https://issues.apache.org/jira/browse/FLINK-35402
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib
Reporter: Luís Costa


Greetings dear team,

I would like to add labels to flink-operator Deployment metadata



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-20 Thread via GitHub


hanyuzheng7 commented on code in PR #24365:
URL: https://github.com/apache/flink/pull/24365#discussion_r1606922419


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
+@Internal
+public class SplitFunction extends BuiltInScalarFunction {
+public SplitFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.SPLIT, context);
+}
+
+public @Nullable ArrayData eval(@Nullable StringData string, @Nullable 
StringData delimiter) {
+try {
+if (string == null || delimiter == null) {
+return null;
+}
+if (delimiter.toString().isEmpty()) {
+List res = new ArrayList<>();
+for (int i = 0; i < string.toString().length(); ++i) {
+
res.add(StringData.fromString(String.valueOf(string.toString().charAt(i;
+}
+return new GenericArrayData(res.toArray());

Review Comment:
   handle empty delimiter at first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-05-20 Thread via GitHub


hanyuzheng7 commented on PR #24365:
URL: https://github.com/apache/flink/pull/24365#issuecomment-2120677719

   > > When the delimiter is empty, do we need to handle this situation 
specially?/
   > > Yes, we do. Do you mind updating the PR?
   
   I have already updated the PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]

2024-05-20 Thread via GitHub


reswqa commented on PR #24741:
URL: https://github.com/apache/flink/pull/24741#issuecomment-2120663487

   We should include it as flink has nightly ci that check jdk17 build also.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606882746


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCustomRescaleITCase.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment;
+import static org.junit.Assert.fail;
+
+/** Integration test for performing rescale of unaligned checkpoint with 
custom partitioner. */
+public class UnalignedCheckpointCustomRescaleITCase {

Review Comment:
   > You could always most likely change the record type for 
UnalignedCheckpointRescaleITCase. String there should work just as fine, but 
that's probably more work vs option 1.
   
   I've just realised that I don't see any place in the 
`UnalignedCheckpointRescaleITCase` that hardcodes the type of the record. You 
can add a new `Topology` (`UnalignedCheckpointRescaleITCase.Topology`) that 
creates any JobGraph, so you can keep your proposed records format. Also I 
wouldn't mind if you re-used the same `LongSource`, but changed the record type 
from `Long` to some POJO of `Long` and `String payload`
   ```
   public static class Record {
 private final long value;
 private final @Nullable String payload;
 (...)
   }
   ```
   where `payload` length/size would be configurable, between 0 (`payload == 
null`) to whatever value you would configured (`3713`?).
   
   This way you won't be duplicating all of the setup code and you will 
leverage the same 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Fix Version/s: aws-connector-4.4.0

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: aws-connector-4.4.0
>
>
> # Add Table API support for Amazon SQS sink as per 
> [FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Description: Add Table API support for Amazon SQS  (was: This is an 
umbrella task for FLIP-438. FLIP-438: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector)

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
>
> Add Table API support for Amazon SQS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Labels:   (was: pull-request-available)

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
>
> Add Table API support for Amazon SQS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Description: # Add Table API support for Amazon SQS sink as per 
[FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]
  (was: # Add Table API support for Amazon SQS sink as per 
[FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]])

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
>
> # Add Table API support for Amazon SQS sink as per 
> [FLIP-481|https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Description: # Add Table API support for Amazon SQS sink as per 
[FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]]
  (was: Add Table API support for Amazon SQS sink as per FLIP-481)

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
>
> # Add Table API support for Amazon SQS sink as per 
> [FLIP-481|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector]]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-35401:

Description: Add Table API support for Amazon SQS sink as per FLIP-481  
(was: Add Table API support for Amazon SQS)

> Add SQS Table API support
> -
>
> Key: FLINK-35401
> URL: https://issues.apache.org/jira/browse/FLINK-35401
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
>
> Add Table API support for Amazon SQS sink as per FLIP-481



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35401) Add SQS Table API support

2024-05-20 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-35401:
---

 Summary: Add SQS Table API support
 Key: FLINK-35401
 URL: https://issues.apache.org/jira/browse/FLINK-35401
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / AWS
Reporter: Ahmed Hamdy


This is an umbrella task for FLIP-438. FLIP-438: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31223][sqlgateway] Introduce getFlinkConfigurationOptions to [flink]

2024-05-20 Thread via GitHub


davidradl commented on PR #24741:
URL: https://github.com/apache/flink/pull/24741#issuecomment-2120541110

   @reswqa If you have strong views I can include it in this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]

2024-05-20 Thread via GitHub


usberkeley commented on PR #24808:
URL: https://github.com/apache/flink/pull/24808#issuecomment-2120534284

   > Oh, `/doc` means flink documentation, while java doc means the 
doc/comments on java class/interface/method/field.
   > 
   > But It doesn't matter, this fix is good enough. Would you mind changing 
the commit message to `[FLINK-35395][core] Fix the example code of 
KeyedStateStore#getState`?
   
   Yes, it's just a java doc error, thank you.
   
   Okay, the commit message has been modified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35395][core] Fix KeyedStateStore class annotation error [flink]

2024-05-20 Thread via GitHub


usberkeley commented on PR #24808:
URL: https://github.com/apache/flink/pull/24808#issuecomment-2120526422

   > Oh, `/doc` means flink documentation, while java doc means the 
doc/comments on java class/interface/method/field.
   > 
   > But It doesn't matter, this fix is good enough. Would you mind changing 
the commit message to `[FLINK-35395][core] Fix the example code of 
KeyedStateStore#getState`?
   
   I understand, it's just a java doc error, thank you.
   
   Okay, the commit message has been modified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   Bumping:
   
   > Maybe in StateAssignmentOperationTest create a unit test that has one FULL 
and one something else, and assert that the assigned states are as they should 
be?
   
   > Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?
   
   sorry to bother you again, but the unit test that you have added still 
doesn't have test coverage. When I try running your previous version of the 
code:
   ```
   boolean noNeedRescale =
   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
   
.map(JobEdge::getDownstreamSubtaskStateMapper)
   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
   && 
stateAssignment.executionJobVertex.getInputs().stream()
   .map(IntermediateResult::getProducer)
   .map(vertexAssignments::get)
   .anyMatch(
   taskStateAssignment -> {
   final int oldParallelism =
   stateAssignment
   .oldState
   
.get(stateAssignment.inputOperatorID)
   
.getParallelism();
   return oldParallelism
   == 
taskStateAssignment.executionJobVertex
   
.getParallelism();
   });
   
   if (inputState.getParallelism() == 
executionJobVertex.getParallelism() && !noNeedRescale) {
   stateAssignment.inputChannelStates.putAll(
   toInstanceMap(stateAssignment.inputOperatorID, 
inputOperatorState));
   return;
   }
   ```
   
   the tests in `StateAssignmentOperationTest` are still green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35351][checkpoint] Fix fail during restore from unaligned chec… [flink]

2024-05-20 Thread via GitHub


pnowojski commented on code in PR #24784:
URL: https://github.com/apache/flink/pull/24784#discussion_r1606820986


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##
@@ -421,7 +423,27 @@ public void 
reDistributeInputChannelStates(TaskStateAssignment stateAssignment)
 stateAssignment.oldState.get(stateAssignment.inputOperatorID);
 final List> inputOperatorState =
 splitBySubtasks(inputState, 
OperatorSubtaskState::getInputChannelState);
-if (inputState.getParallelism() == 
executionJobVertex.getParallelism()) {
+
+boolean noNeedRescale =
+
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
+.map(JobEdge::getDownstreamSubtaskStateMapper)
+.anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
+&& 
stateAssignment.executionJobVertex.getInputs().stream()
+.map(IntermediateResult::getProducer)
+.map(vertexAssignments::get)
+.anyMatch(
+taskStateAssignment -> {
+final int oldParallelism =
+stateAssignment
+.oldState
+
.get(stateAssignment.inputOperatorID)
+.getParallelism();
+return oldParallelism
+== 
taskStateAssignment.executionJobVertex
+.getParallelism();
+});

Review Comment:
   > Does this bug have a test coverage? I mean, either was there some test 
failing or have you added a new test to cover for a future regression?
   
   
   Hey, sorry to bother you again, but the unit test that you have added still 
doesn't have test coverage. When I try running your previous version of the 
code:
   ```
   boolean noNeedRescale =
   
stateAssignment.executionJobVertex.getJobVertex().getInputs().stream()
   
.map(JobEdge::getDownstreamSubtaskStateMapper)
   .anyMatch(m -> 
!m.equals(SubtaskStateMapper.FULL))
   && 
stateAssignment.executionJobVertex.getInputs().stream()
   .map(IntermediateResult::getProducer)
   .map(vertexAssignments::get)
   .anyMatch(
   taskStateAssignment -> {
   final int oldParallelism =
   stateAssignment
   .oldState
   
.get(stateAssignment.inputOperatorID)
   
.getParallelism();
   return oldParallelism
   == 
taskStateAssignment.executionJobVertex
   
.getParallelism();
   });
   
   if (inputState.getParallelism() == 
executionJobVertex.getParallelism() && !noNeedRescale) {
   stateAssignment.inputChannelStates.putAll(
   toInstanceMap(stateAssignment.inputOperatorID, 
inputOperatorState));
   return;
   }
   ```
   
   the tests in `StateAssignmentOperationTest` are still green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   @davidradl 
   In the Calcite, the author didn't demonstrate the reason and the origin text 
is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727
   
   While in the Spark, it is different from the Calcite, although the Calcite 
said that it can be served for Spark(which made me think they have the same 
action at the first time). 
   In fact, I read its code and tested, Spark will throw an exception instead 
of returning the origin value in this case.
   
   So what's Flink team's expectation? I will follow your guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   @davidradl 
   In the Calcite, the author didn't demonstrate the reason and the origin text 
is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727
   
   While in the Spark, it is different from the Calcite, although the Calcite 
said that it can be served for Spark(which made me think they have the same 
action at the first time). In fact, I read its code and tested, Spark will 
throw an exception instead of returning the origin value in this case.
   
   So what's Flink team's expectation? I will follow your guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix ingress example in docs [flink-kubernetes-operator]

2024-05-20 Thread via GitHub


yardenshoham opened a new pull request, #828:
URL: https://github.com/apache/flink-kubernetes-operator/pull/828

   In the "Simple domain based routing" example, there's no `/` so the ingress 
doesn't work by default


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   @davidradl 
   In the Calcite, the author didn't demonstrate the reason and the origin text 
is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727
   
   While in the Spark, it is different with the Calcite, although the Calcite 
said that it can be served for Spark. In fact, I read its code and tested, 
Spark will throw an exception instead of returning the origin value in this 
case.
   
   So what's Flink team's expectation? I will follow your guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   @davidradl 
   In the Calcite, the author didn't demonstrate the reason and the origin text 
is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727
   
   While in the Spark, it is different with the Calcite, although the Calcite 
said that it can be served for Spark. In fact, Spark will throw an exception 
instead of returning the origin value in this case.
   
   So what's Flink team's expectation? I will follow your guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606797248


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   In the Calcite, the author didn't demonstrate the reason and the origin text 
is: https://github.com/apache/calcite/pull/3318#discussion_r1372603727
   
   While in the Spark, it is different with the Calcite, although the Calcite 
said that it can be served for Spark. In fact, Spark will throw an exception 
instead of returning the origin value in this case.
   
   So what's Flink team's expectation? I will follow your guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


superdiaodiao commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606776660


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlEncodeFunction.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_ENCODE}. */
+@Internal
+public class UrlEncodeFunction extends BuiltInScalarFunction {
+
+public UrlEncodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_ENCODE, context);
+}
+
+public @Nullable StringData eval(StringData url) {
+if (url == null) {
+return null;
+}
+final Charset charset = StandardCharsets.UTF_8;
+String value;
+
+try {
+value = URLEncoder.encode(url.toString(), charset.name());
+} catch (UnsupportedEncodingException e) {
+throw new FlinkRuntimeException(
+"Failed to encode URL: " + url + " with charset: " + 
charset.name(), e);

Review Comment:
   ok, I will add



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


davidradl commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606752091


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlEncodeFunction.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_ENCODE}. */
+@Internal
+public class UrlEncodeFunction extends BuiltInScalarFunction {
+
+public UrlEncodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_ENCODE, context);
+}
+
+public @Nullable StringData eval(StringData url) {
+if (url == null) {
+return null;
+}
+final Charset charset = StandardCharsets.UTF_8;
+String value;
+
+try {
+value = URLEncoder.encode(url.toString(), charset.name());
+} catch (UnsupportedEncodingException e) {
+throw new FlinkRuntimeException(
+"Failed to encode URL: " + url + " with charset: " + 
charset.name(), e);

Review Comment:
   Nit:
   I suggest changing the Exception message to a little more explicit  "...with 
charset:" to .. due to an unsupported encoding with charset:"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-20 Thread via GitHub


davidradl commented on code in PR #24773:
URL: https://github.com/apache/flink/pull/24773#discussion_r1606749183


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/UrlDecodeFunction.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#URL_DECODE}. */
+@Internal
+public class UrlDecodeFunction extends BuiltInScalarFunction {
+
+public UrlDecodeFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.URL_DECODE, context);
+}
+
+public @Nullable StringData eval(StringData value) {
+final Charset charset = StandardCharsets.UTF_8;
+try {
+return StringData.fromString(URLDecoder.decode(value.toString(), 
charset.name()));
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(
+"Failed to decode value: " + value + " with charset: " + 
charset.name(), e);
+} catch (RuntimeException e) {
+return value;
+}

Review Comment:
   @superdiaodiao
   Do we know why Calcite/Spark do this? It seems to be a strange design choice 
as we will go forward with this string not decoded, but the subsequent code 
would expect it to be decoded and is likely to get unexpected results; which 
would be tricky to debug. 
   I notice that encode does not do this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >