Re: [PR] Dev 0306 2025 [flink]

2024-03-09 Thread via GitHub


WencongLiu closed pull request #24468: Dev 0306 2025
URL: https://github.com/apache/flink/pull/24468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32679) Filter conditions cannot be pushed to JOIN in some case

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824995#comment-17824995
 ] 

Jeyhun Karimov commented on FLINK-32679:


Hi [~grandfisher] the mentioned query does pushes the join conditions down the 
join operator (I verified below with the current master - 
d6a4eb966fbc47277e07b79e7c64939a62eb1d54). Or am I missing something? 

 
{code:java}
Calc(select=[CAST(0 AS INTEGER) AS id, b, CAST(0 AS INTEGER) AS id0, b0, CAST(0 
AS INTEGER) AS id1, b1, CAST(0 AS INTEGER) AS id2, b2, CAST(0 AS INTEGER) AS 
id3, b3])
+- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1, b2, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[single])
   :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :- Exchange(distribution=[single])
   :     :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0, b1], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :     :- Exchange(distribution=[single])
   :     :     :  +- Join(joinType=[InnerJoin], where=[true], select=[b, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :     :     :     :- Exchange(distribution=[single])
   :     :     :     :  +- Calc(select=[b], where=[(id = 0)])
   :     :     :     :     +- TableSourceScan(table=[[default_catalog, 
default_database, v1]], fields=[id, b])
   :     :     :     +- Exchange(distribution=[single])
   :     :     :        +- Calc(select=[b], where=[(id = 0)])
   :     :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, v2]], fields=[id, b])
   :     :     +- Exchange(distribution=[single])
   :     :        +- Calc(select=[b], where=[(0 = id)])
   :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, v3]], fields=[id, b])
   :     +- Exchange(distribution=[single])
   :        +- Calc(select=[b], where=[(0 = id)])
   :           +- TableSourceScan(table=[[default_catalog, default_database, 
v4]], fields=[id, b])
   +- Exchange(distribution=[single])
      +- Calc(select=[b], where=[(0 = id)])
         +- TableSourceScan(table=[[default_catalog, default_database, v5]], 
fields=[id, b]){code}

> Filter conditions cannot be pushed to JOIN in some case
> ---
>
> Key: FLINK-32679
> URL: https://issues.apache.org/jira/browse/FLINK-32679
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: grandfisher
>Priority: Major
>
> There is a case
> {code:java}
> SELECT a.id, b.id, c.id, d.id, e.id
>   , f.id
> FROM `table-v1` a
>   INNER JOIN `table-v2` b ON a.id = b.id
>   INNER JOIN `table-v3` c ON b.id = c.id
>   INNER JOIN `table-v4` d ON c.id = d.id
>   INNER JOIN `table-v5` e ON d.id = e.id
>   INNER JOIN `table-v6` f ON a.id = f.id
> WHERE f.id = 0
> {code}
> In this sql, each table should have a condition {*}id=0{*}, but actually only 
> table *f* and *a* has this condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32998) if function result not correct

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824988#comment-17824988
 ] 

Jeyhun Karimov edited comment on FLINK-32998 at 3/9/24 10:25 PM:
-

[~martijnvisser] [~zhou_yb]  I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 


was (Author: jeyhunkarimov):
[~martijnvisser] I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 

> if function result not correct
> --
>
> Key: FLINK-32998
> URL: https://issues.apache.org/jira/browse/FLINK-32998
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-08-30-18-29-16-277.png, 
> image-2023-08-30-18-30-05-568.png
>
>
> *if function result not correct,not result in origin field value, cut off the 
> filed(word) value* 
> code :
> !image-2023-08-30-18-29-16-277.png!
> result:
> !image-2023-08-30-18-30-05-568.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32998) if function result not correct

2024-03-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824988#comment-17824988
 ] 

Jeyhun Karimov commented on FLINK-32998:


[~martijnvisser] I verified that as of master 
(d6a4eb966fbc47277e07b79e7c64939a62eb1d54) the issue seems to be fixed. 

> if function result not correct
> --
>
> Key: FLINK-32998
> URL: https://issues.apache.org/jira/browse/FLINK-32998
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: zhou
>Priority: Major
> Attachments: image-2023-08-30-18-29-16-277.png, 
> image-2023-08-30-18-30-05-568.png
>
>
> *if function result not correct,not result in origin field value, cut off the 
> filed(word) value* 
> code :
> !image-2023-08-30-18-29-16-277.png!
> result:
> !image-2023-08-30-18-30-05-568.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32706][table] Add built-in SPLIT_STRING function [flink]

2024-03-09 Thread via GitHub


jeyhunkarimov commented on code in PR #24365:
URL: https://github.com/apache/flink/pull/24365#discussion_r1518655288


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/SplitFunction.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#SPLIT}. */
+@Internal
+public class SplitFunction extends BuiltInScalarFunction {
+public SplitFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.SPLIT, context);
+}
+
+public @Nullable ArrayData eval(@Nullable StringData string, @Nullable 
StringData delimiter) {
+try {
+if (string == null || delimiter == null) {
+return null;
+}
+String string1 = string.toString();
+String delimiter1 = delimiter.toString();
+List resultList = new ArrayList<>();
+
+if (delimiter1.equals("")) {
+for (char c : string1.toCharArray()) {
+
resultList.add(BinaryStringData.fromString(String.valueOf(c)));
+}
+} else {
+int start = 0;
+int end = string1.indexOf(delimiter1);
+while (end != -1) {
+String substring = string1.substring(start, end);
+resultList.add(
+BinaryStringData.fromString(
+substring.isEmpty()
+? ""
+: substring)); // Added this check 
to handle consecutive
+// delimiters
+start = end + delimiter1.length();
+end = string1.indexOf(delimiter1, start);
+}
+String remaining = string1.substring(start);

Review Comment:
   What if the delimiter is found at the very end of the string, the remaining 
string will be empty, but you should still add an empty string to the 
resultList to indicate the presence of the delimiter at the end of the string. 
Or am I missing sth?



##
flink-python/pyflink/table/expression.py:
##
@@ -1609,6 +1609,17 @@ def array_min(self) -> 'Expression':
 """
 return _unary_op("arrayMin")(self)
 
+def split(self, delimiter) -> 'Expression':

Review Comment:
   Please add python test(s) for `split` as well



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1596,4 +1597,83 @@ private Stream arraySortTestCases() {
 },
 DataTypes.ARRAY(DataTypes.DATE(;
 }
+
+private Stream splitTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.SPLIT)
+.onFieldsWithData(

Review Comment:
   Please include tests also for
   - SPLIT("", "")
   - SPLIT("", ",")
   - SPLIT(",,", ",,")
   - - SPLIT(",,", ",,,")
   - SPLIT("s", "ss")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-03-09 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-34634:
--
Description: 
h3. What's the problem

Once, I removed a table from the option and then restarted the job from the 
savepoint, but the job couldn't read the binlog anymore. When I checked the 
logs, I found an Error level log stating:

' The enumerator received invalid request meta group id 6, the valid meta group 
id range is [0, 4].'

It appears that the Reader is requesting more splits than the Enumerator is 
aware of.

However, the code should indeed remove redundant split information from the 
Reader as seen in 
[https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does this 
issue occur?

 
h3. why occurs

!image-2024-03-09-15-25-26-187.png|width=751,height=329!

Upon examining the code, I discovered the cause. If the job stops before 
completing all the split meta information and then restarts, this issue occurs. 
Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and 
no meta information has been synchronized, leaving the 
finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - 
(0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
table have two split). This could lead to an out-of-range request.

!image-2024-03-09-15-27-46-073.png|width=755,height=305!
h3. How to reproduce
 * Add Thread.sleep(1000L) in 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
 to postpone split meta infos synchronization.

{code:java}
public void handleSourceEvents(SourceEvent sourceEvent) {
else if (sourceEvent instanceof BinlogSplitMetaEvent) {
LOG.debug(
"Source reader {} receives binlog meta with group id {}.",
subtaskId,
((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} {code}
 * Add Thread.sleep(500L) in 
com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
 to trigger savepoint before meta infos synchronization finishes.

 
{code:java}
// step 2: execute insert and trigger savepoint with all tables added
{
// ..ingore 

waitForSinkSize("sink", fetchedDataList.size());
Thread.sleep(500L);
assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
jobClient.cancel().get();
}

// test removing table one by one, note that there should be at least one table 
remaining
for (int round = 0; round < captureAddressTables.length - 1; round++) {
...
}

{code}
 
 * Add chunk-meta.group.size  =2 in 
com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement

Then, run 
test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
 the error log will occur.

 

  was:
Once, I removed a table from the option and then restarted the job from the 
savepoint, but the job couldn't read the binlog anymore. When I checked the 
logs, I found an Error level log stating:

' The enumerator received invalid request meta group id 6, the valid meta group 
id range is [0, 4].'

It appears that the Reader is requesting more splits than the Enumerator is 
aware of.

However, the code should indeed remove redundant split information from the 
Reader as seen in 
[https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does this 
issue occur?

!image-2024-03-09-15-25-26-187.png|width=751,height=329!

Upon examining the code, I discovered the cause. If the job stops before 
completing all the split meta information and then restarts, this issue occurs. 
Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and 
no meta information has been synchronized, leaving the 
finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - 
(0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
table have two split). This could lead to an out-of-range request.

!image-2024-03-09-15-27-46-073.png|width=755,height=305!


> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> ---
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Fli

[jira] [Updated] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-03-09 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-34634:
--
Issue Type: Bug  (was: Improvement)

> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> ---
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-09-15-25-26-187.png, 
> image-2024-03-09-15-27-46-073.png
>
>
> Once, I removed a table from the option and then restarted the job from the 
> savepoint, but the job couldn't read the binlog anymore. When I checked the 
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta 
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is 
> aware of.
> However, the code should indeed remove redundant split information from the 
> Reader as seen in 
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does 
> this issue occur?
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before 
> completing all the split meta information and then restarts, this issue 
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader 
> is 6, and no meta information has been synchronized, leaving the 
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34635) Clear successful records from the batch in JDBC connector

2024-03-09 Thread Sai Sharath Dandi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sai Sharath Dandi updated FLINK-34635:
--
Affects Version/s: 1.18.1

> Clear successful records from the batch in JDBC connector
> -
>
> Key: FLINK-34635
> URL: https://issues.apache.org/jira/browse/FLINK-34635
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.18.1
>Reporter: Sai Sharath Dandi
>Priority: Minor
>
> Currently, when batch execution fails in the JDBC connector, the whole batch 
> is retried in the JDBC connector which is unnecessary. We should clear the 
> records that were successful in the 
> [SimpleBatchStatementExecutor|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java]
> {code:java}
>  @Override
>     public void executeBatch() throws SQLException {
>         if (!batch.isEmpty()) {
>             for (T r : batch) {
>                 parameterSetter.accept(st, r);
>                 st.addBatch();
>             }
>             st.executeBatch();
> --> catch the exception and clear successful records from the batch here
>             batch.clear();
>         }
>     }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34635) Clear successful records from the batch in JDBC connector

2024-03-09 Thread Sai Sharath Dandi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sai Sharath Dandi updated FLINK-34635:
--
Priority: Minor  (was: Major)

> Clear successful records from the batch in JDBC connector
> -
>
> Key: FLINK-34635
> URL: https://issues.apache.org/jira/browse/FLINK-34635
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: Sai Sharath Dandi
>Priority: Minor
>
> Currently, when batch execution fails in the JDBC connector, the whole batch 
> is retried in the JDBC connector which is unnecessary. We should clear the 
> records that were successful in the 
> [SimpleBatchStatementExecutor|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java]
> {code:java}
>  @Override
>     public void executeBatch() throws SQLException {
>         if (!batch.isEmpty()) {
>             for (T r : batch) {
>                 parameterSetter.accept(st, r);
>                 st.addBatch();
>             }
>             st.executeBatch();
> --> catch the exception and clear successful records from the batch here
>             batch.clear();
>         }
>     }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34635) Clear successful records from the batch in JDBC connector

2024-03-09 Thread Sai Sharath Dandi (Jira)
Sai Sharath Dandi created FLINK-34635:
-

 Summary: Clear successful records from the batch in JDBC connector
 Key: FLINK-34635
 URL: https://issues.apache.org/jira/browse/FLINK-34635
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: Sai Sharath Dandi


Currently, when batch execution fails in the JDBC connector, the whole batch is 
retried in the JDBC connector which is unnecessary. We should clear the records 
that were successful in the 
[SimpleBatchStatementExecutor|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java]


{code:java}
 @Override
    public void executeBatch() throws SQLException {
        if (!batch.isEmpty()) {
            for (T r : batch) {
                parameterSetter.accept(st, r);
                st.addBatch();
            }
            st.executeBatch();
--> catch the exception and clear successful records from the batch here
            batch.clear();
        }
    }{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)