Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


loserwang1024 commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599417206


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtil.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtil {
+
+/**
+ * Query the maximum and minimum value of the column in the table. e.g. 
query string 
+ * SELECT MIN(%s) FROM %s WHERE %s > ?
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param columnName column name.
+ * @return maximum and minimum value.
+ */
+public static Object[] queryMinMax(JdbcConnection jdbc, String tableId, 
String columnName)
+throws SQLException {
+final String minMaxQuery =
+String.format("SELECT MIN(%s), MAX(%s) FROM %s", columnName, 
columnName, tableId);

Review Comment:
   Thanks, it looks better!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33986][runtime] Extend ShuffleMaster to support snapshot and restore state. [flink]

2024-05-13 Thread via GitHub


zhuzhurk commented on code in PR #24774:
URL: https://github.com/apache/flink/pull/24774#discussion_r1599414778


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##
@@ -121,4 +122,21 @@ default 
CompletableFuture> getAllPartitionWithM
 JobID jobId) {
 return CompletableFuture.completedFuture(Collections.emptyList());
 }
+
+/**
+ * Whether the shuffle master supports taking snapshot in batch scenarios, 
which will be used
+ * when enable Job Recovery. If it returns true, we will call {@link 
#snapshotState} to take

Review Comment:
   which will be used when enable Job Recovery
   -> if `execution.batch.job-recovery.enabled` is true.
   
   we
   -> Flink



##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterSnapshotContext.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+/**
+ * Snapshot context used to create {@link ShuffleMasterSnapshot}, which can 
provide necessary
+ * information.

Review Comment:
   > which can provide necessary information
   
   Looks redundant to me. I prefer to remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


loserwang1024 commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599410767


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtil.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtil {

Review Comment:
   done it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


loserwang1024 commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599410084


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -18,104 +18,144 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtil;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
+import static java.math.BigDecimal.ROUND_CEILING;
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {

Review Comment:
   1. Interface  has no constructor. Most of the code needs sourceConfig and 
dialect.
   2. JdbcSourceChunkSplitter now is as a  template class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846177#comment-17846177
 ] 

Zhu Zhu commented on FLINK-35293:
-

The change is merged. Could you add release notes for it and close the ticket? 
[~xiasun].

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35348) Implement materialized table refresh rest api

2024-05-13 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-35348:
-

Assignee: Feng Jin

> Implement materialized table refresh rest api 
> --
>
> Key: FLINK-35348
> URL: https://issues.apache.org/jira/browse/FLINK-35348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: Feng Jin
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35347) Implement InMemory workflow scheduler service and plugin to support materialized table

2024-05-13 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-35347:
-

Assignee: dalongliu

> Implement InMemory workflow scheduler service and plugin to support 
> materialized table
> --
>
> Key: FLINK-35347
> URL: https://issues.apache.org/jira/browse/FLINK-35347
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35348) Implement materialized table refresh rest api

2024-05-13 Thread dalongliu (Jira)
dalongliu created FLINK-35348:
-

 Summary: Implement materialized table refresh rest api 
 Key: FLINK-35348
 URL: https://issues.apache.org/jira/browse/FLINK-35348
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Gateway
Affects Versions: 1.20.0
Reporter: dalongliu
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35346) Introduce pluggable workflow scheduler interface for materialized table

2024-05-13 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-35346:
-

Assignee: dalongliu

> Introduce pluggable workflow scheduler interface for materialized table
> ---
>
> Key: FLINK-35346
> URL: https://issues.apache.org/jira/browse/FLINK-35346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35347) Implement InMemory workflow scheduler service and plugin to support materialized table

2024-05-13 Thread dalongliu (Jira)
dalongliu created FLINK-35347:
-

 Summary: Implement InMemory workflow scheduler service and plugin 
to support materialized table
 Key: FLINK-35347
 URL: https://issues.apache.org/jira/browse/FLINK-35347
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Gateway
Affects Versions: 1.20.0
Reporter: dalongliu
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35346) Introduce pluggable workflow scheduler interface for materialized table

2024-05-13 Thread dalongliu (Jira)
dalongliu created FLINK-35346:
-

 Summary: Introduce pluggable workflow scheduler interface for 
materialized table
 Key: FLINK-35346
 URL: https://issues.apache.org/jira/browse/FLINK-35346
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: dalongliu
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35345) FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table

2024-05-13 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu reassigned FLINK-35345:
-

Assignee: dalongliu

> FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized 
> Table
> -
>
> Key: FLINK-35345
> URL: https://issues.apache.org/jira/browse/FLINK-35345
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Ecosystem
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> This is an umbrella issue for FLIP-448: Introduce Pluggable Workflow 
> Scheduler Interface for Materialized Table, for more detail, please see 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Introduce+Pluggable+Workflow+Scheduler+Interface+for+Materialized+Table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35153][State] Internal async list/map state and corresponding state descriptor [flink]

2024-05-13 Thread via GitHub


flinkbot commented on PR #24781:
URL: https://github.com/apache/flink/pull/24781#issuecomment-2109314883

   
   ## CI report:
   
   * 2f5f8598627c98bfa1bf88e7a6e0de51319748f2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35153) Internal Async State Implementation and StateDescriptor for Map/List State

2024-05-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35153:
---
Labels: pull-request-available  (was: )

> Internal Async State Implementation and StateDescriptor for Map/List State
> --
>
> Key: FLINK-35153
> URL: https://issues.apache.org/jira/browse/FLINK-35153
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35153][State] Internal async list/map state and corresponding state descriptor [flink]

2024-05-13 Thread via GitHub


Zakelly opened a new pull request, #24781:
URL: https://github.com/apache/flink/pull/24781

   ## What is the purpose of the change
   
   This PR provides the definition of `StateDescriptor` and simple 
implementation for `List` and `Map` state in `State V2`.
   
   ## Brief change log
- Tests for `ValueState` and `ValueStateDescriptor`
- Definition and tests for `MapState` and `MapStateDescriptor`
- Definition and tests for `ListState` and `ListStateDescriptor`
   
   ## Verifying this change
   
   Added UT for each new introduced class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35334) Code generation: init method exceeds 64 KB when there is a long array field with Table API

2024-05-13 Thread Simon Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simon Lee updated FLINK-35334:
--
Priority: Blocker  (was: Major)

> Code generation: init method exceeds 64 KB when there is a long array field 
> with Table API
> --
>
> Key: FLINK-35334
> URL: https://issues.apache.org/jira/browse/FLINK-35334
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: Simon Lee
>Priority: Blocker
>
> Hi team,
> I encountered the following error when trying to execute SQL on a table that 
> has rows with fields that are long arrays (e.g., array length > 200):
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'BatchExecCalc$4950'
>  at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:84)
>  ...
> Caused by: org.codehaus.janino.InternalCompilerException: Code of method 
> "([Ljava/lang/Object;Lorg/apache/flink/streaming/runtime/tasks/StreamTask;Lorg/apache/flink/streaming/api/graph/StreamConfig;Lorg/apache/flink/streaming/api/operators/Output;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;)V"
>  of class "BatchExecCalc$4950" grows beyond 64 KB {code}
> A minimal example that reproduces the situation described above:
> [https://gist.github.com/nlpersimon/df71c0bec93c13667965ce1706099fdb]
> After running the example with Intellij, I got this output:
> [https://gist.github.com/nlpersimon/f741b79c37da7426aeefc7a157cdd124]
> Please let me know if you need any other information. Thank you!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


whhe commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599378513


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtil.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtil {
+
+/**
+ * Query the maximum and minimum value of the column in the table. e.g. 
query string 
+ * SELECT MIN(%s) FROM %s WHERE %s > ?
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param columnName column name.
+ * @return maximum and minimum value.
+ */
+public static Object[] queryMinMax(JdbcConnection jdbc, String tableId, 
String columnName)
+throws SQLException {
+final String minMaxQuery =
+String.format("SELECT MIN(%s), MAX(%s) FROM %s", columnName, 
columnName, tableId);

Review Comment:
   How about using `jdbc.quotedTableIdString` and `jdbc.quotedColumnIdString` 
to quote tableId and columnName.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32443) Translate "State Processor API" page into Chinese

2024-05-13 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei closed FLINK-32443.
--

> Translate "State Processor API" page into Chinese
> -
>
> Key: FLINK-32443
> URL: https://issues.apache.org/jira/browse/FLINK-32443
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, chinese-translation, Documentation
>Affects Versions: 1.18.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>
> The page URL is 
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/libs/state_processor_api/]
> The markdown file is located in 
> docs/content.zh/docs/libs/state_processor_api.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32443) Translate "State Processor API" page into Chinese

2024-05-13 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-32443.

Resolution: Resolved

Merged into master via 26817092

> Translate "State Processor API" page into Chinese
> -
>
> Key: FLINK-32443
> URL: https://issues.apache.org/jira/browse/FLINK-32443
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, chinese-translation, Documentation
>Affects Versions: 1.18.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>
> The page URL is 
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/libs/state_processor_api/]
> The markdown file is located in 
> docs/content.zh/docs/libs/state_processor_api.md



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


yuxiqian commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599352772


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -18,104 +18,144 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtil;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
+import static java.math.BigDecimal.ROUND_CEILING;
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {

Review Comment:
   Is there a reason to change `Splitter` interface to an abstract class, 
instead of providing a `default` implementation for `generateSplits`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


yuxiqian commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599352772


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -18,104 +18,144 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtil;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
+import static java.math.BigDecimal.ROUND_CEILING;
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {

Review Comment:
   Is there a reason to change `Splitter` interface into an abstract class, 
instead of providing a `default` implementation for `generateSplits`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35345) FLIP-448: Introduce Pluggable Workflow Scheduler Interface for Materialized Table

2024-05-13 Thread dalongliu (Jira)
dalongliu created FLINK-35345:
-

 Summary: FLIP-448: Introduce Pluggable Workflow Scheduler 
Interface for Materialized Table
 Key: FLINK-35345
 URL: https://issues.apache.org/jira/browse/FLINK-35345
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Ecosystem
Affects Versions: 1.20.0
Reporter: dalongliu
 Fix For: 1.20.0


This is an umbrella issue for FLIP-448: Introduce Pluggable Workflow Scheduler 
Interface for Materialized Table, for more detail, please see 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Introduce+Pluggable+Workflow+Scheduler+Interface+for+Materialized+Table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


yuxiqian commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1599352772


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -18,104 +18,144 @@
 package org.apache.flink.cdc.connectors.base.source.assigner.splitter;
 
 import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtil;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
+import static java.math.BigDecimal.ROUND_CEILING;
+import static 
org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 @Experimental
-public interface JdbcSourceChunkSplitter extends ChunkSplitter {

Review Comment:
   Is there any reason to change `Splitter` interface into an abstract class, 
instead of providing a `default` implementation for `generateSplits`?



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtil.java:
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.source.utils;
+
+import org.apache.flink.table.api.ValidationException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.rowToArray;
+
+/** Utilities to split chunks of table. */
+public class JdbcChunkUtil {

Review Comment:
   Seems `Utils` are used more than `Util` in CDC repo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35344) Move same code from multiple subclasses to JdbcSourceChunkSplitter

2024-05-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35344:
---
Labels: pull-request-available  (was: )

> Move same code from multiple subclasses to JdbcSourceChunkSplitter
> --
>
> Key: FLINK-35344
> URL: https://issues.apache.org/jira/browse/FLINK-35344
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current, subclasses of JdbcSourceChunkSplitter almost share same code, but 
> each have one copy. It's hard for later maintenance. 
> Thus, this Jira aim to move same code from multiple subclasses to 
> JdbcSourceChunkSplitter, just like what have done in 
> AbstractScanFetchTask(https://github.com/apache/flink-cdc/issues/2690)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-05-13 Thread via GitHub


loserwang1024 opened a new pull request, #3319:
URL: https://github.com/apache/flink-cdc/pull/3319

   Current, subclasses of JdbcSourceChunkSplitter almost share same code, but 
each have one copy. It's hard for later maintenance. 
   
   Thus, this Jira aim to move same code from multiple subclasses to 
JdbcSourceChunkSplitter, just like what have done in 
   AbstractScanFetchTask(https://github.com/apache/flink-cdc/issues/2690)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35344) Move same code from multiple subclasses to JdbcSourceChunkSplitter

2024-05-13 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846167#comment-17846167
 ] 

Hongshun Wang commented on FLINK-35344:
---

I'd like to do it.

> Move same code from multiple subclasses to JdbcSourceChunkSplitter
> --
>
> Key: FLINK-35344
> URL: https://issues.apache.org/jira/browse/FLINK-35344
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Current, subclasses of JdbcSourceChunkSplitter almost share same code, but 
> each have one copy. It's hard for later maintenance. 
> Thus, this Jira aim to move same code from multiple subclasses to 
> JdbcSourceChunkSplitter, just like what have done in 
> AbstractScanFetchTask(https://github.com/apache/flink-cdc/issues/2690)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35253][docs-zh]Translate "State Processor API "page into Chinese [flink]

2024-05-13 Thread via GitHub


fredia commented on PR #24780:
URL: https://github.com/apache/flink/pull/24780#issuecomment-2109222070

   Hi @drymatini, thanks for your contribution, "State Processor API" was 
already translated in master,  I think documents do not need to be backported 
to the historical release branch, maybe we can find if there are other 
documents on master that need to be translated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35344) Move same code from multiple subclasses to JdbcSourceChunkSplitter

2024-05-13 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35344:
-

 Summary: Move same code from multiple subclasses to 
JdbcSourceChunkSplitter
 Key: FLINK-35344
 URL: https://issues.apache.org/jira/browse/FLINK-35344
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, subclasses of JdbcSourceChunkSplitter almost share same code, but each 
have one copy. It's hard for later maintenance. 

Thus, this Jira aim to move same code from multiple subclasses to 
JdbcSourceChunkSplitter, just like what have done in 
AbstractScanFetchTask(https://github.com/apache/flink-cdc/issues/2690)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34756) [MySQL-CDC] serverId parameter out of Integer range

2024-05-13 Thread chenyunde (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846161#comment-17846161
 ] 

chenyunde commented on FLINK-34756:
---

The range of server-id should be between 1 to 2 ^ 32-1

> [MySQL-CDC] serverId parameter out of Integer range
> ---
>
> Key: FLINK-34756
> URL: https://issues.apache.org/jira/browse/FLINK-34756
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Priority: Major
>  Labels: github-import
>
> **Describe the bug(Please use English)**
> Now the 'server_id' is store as int, but the value of parameter 'server_id' 
> would be larger than INT_MAX, this would cause IntegerParseException
> **Environment :**
>  - Flink version :  1.16
>  - Flink CDC version: 2.3.0
>  - Database and version: mysql 5.7
> **To Reproduce**
> Steps to reproduce the behavior:
> 1. The test data :
> MySQL instance in Aliyun Cloud, with a large server_id (out of INT range)
> ```bash
> mysql> show variables like '%server_id%';
> +++
> | Variable_name  | Value  |
> +++
> | server_id  | 2437466879 |
> | server_id_bits | 32 |
> +++
> ```
> 2. The test code :
> ```java
> public class MySqlSourceExample {
>   public static void main(String[] args) throws Exception {
> MySqlSource mySqlSource = MySqlSource.builder()
>   .hostname("host")
>   .port(3306)
>   .databaseList("dbs") // set captured database
>   .tableList("tables") // set captured table
>   .username("user")
>   .password("password")
>   .serverId("1-2437466879")
>   .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
> SourceRecord to JSON String
>   .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
> Source")
>   // set 4 parallel source tasks
>   .setParallelism(4)
>   .print().setParallelism(1); // use parallelism 1 for sink to keep 
> message ordering
>  env.execute("Print MySQL Snapshot + Binlog");
>   }
> }
> ```
> 3. The error :
> ```bash
> Exception in thread "main" java.lang.IllegalStateException: The server id 
> 2437466879 is not a valid numeric.
>   at 
> com.ververica.cdc.connectors.mysql.source.config.ServerIdRange.parseServerId(ServerIdRange.java:108)
>   at 
> com.ververica.cdc.connectors.mysql.source.config.ServerIdRange.from(ServerIdRange.java:96)
>   at 
> com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.serverId(MySqlSourceConfigFactory.java:130)
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder.serverId(MySqlSourceBuilder.java:108)
>   at 
> com.bytedance.openplatform.flink.cdc.examples.MySqlSourceExample.main(MySqlSourceExample.java:25)
> Caused by: java.lang.NumberFormatException: For input string: "2437466879"
>   at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>   at java.lang.Integer.parseInt(Integer.java:583)
>   at java.lang.Integer.parseInt(Integer.java:615)
>   at 
> com.ververica.cdc.connectors.mysql.source.config.ServerIdRange.parseServerId(ServerIdRange.java:105)
>   ... 4 more
> ```
> **Additional Description**
> If applicable, add screenshots to help explain your problem.
>  Imported from GitHub 
> Url: https://github.com/apache/flink-cdc/issues/1770
> Created by: [legendtkl|https://github.com/legendtkl]
> Labels: bug, 
> Created at: Wed Nov 23 17:36:24 CST 2022
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846157#comment-17846157
 ] 

Gang Yang edited comment on FLINK-35296 at 5/14/24 3:28 AM:


This abnormal situation occurs not only in sub-databases and sub-tables, but 
also in single tables,As shown below:

!image-2024-05-14-11-25-55-565.png!

diff=max(id)-min(id), which is about 3 times different from the count value. 
[~leonard] 


was (Author: 清月):
This abnormal situation occurs not only in sub-databases and sub-tables, but 
also in single tables,As shown below:

!image-2024-05-14-11-25-55-565.png!

diff=max(id)-min(id), which is about 3 times different from the count value.

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846157#comment-17846157
 ] 

Gang Yang commented on FLINK-35296:
---

This abnormal situation occurs not only in sub-databases and sub-tables, but 
also in single tables,As shown below:

!image-2024-05-14-11-25-55-565.png!

diff=max(id)-min(id), which is about 3 times different from the count value.

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-13 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846156#comment-17846156
 ] 

Feng Jin commented on FLINK-35342:
--

{color:#00}This issue will be fixed in this PR.{color}  
https://github.com/apache/flink/pull/24777

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Yang updated FLINK-35296:
--
Attachment: image-2024-05-14-11-25-55-565.png

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846153#comment-17846153
 ] 

Gang Yang commented on FLINK-35296:
---

Yes, I have checked it. The maximum data volume of a sub-table is 1315883 and 
the minimum is 443151. The difference is no more than three times.  [~leonard] 

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35296 ]


Gang Yang deleted comment on FLINK-35296:
---

was (Author: 清月):
是的,已经检查过,分表最大数据量是1315883,最小是443151,差距不超过三倍

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-13 Thread Gang Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846152#comment-17846152
 ] 

Gang Yang commented on FLINK-35296:
---

是的,已经检查过,分表最大数据量是1315883,最小是443151,差距不超过三倍

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging [flink]

2024-05-13 Thread via GitHub


fredia commented on PR #24762:
URL: https://github.com/apache/flink/pull/24762#issuecomment-2109196919

   > Thanks for the update! Overall LGTM. And will this `SpaceStat` report to 
metrics?
   
   Currently no, will report to metric in FLINK-32091.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging [flink]

2024-05-13 Thread via GitHub


Zakelly commented on code in PR #24762:
URL: https://github.com/apache/flink/pull/24762#discussion_r1599318520


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java:
##
@@ -88,18 +88,34 @@ PhysicalFile perform(
  */
 private boolean deleted = false;
 
+/**
+ * If a physical file is owned by current {@link 
FileMergingSnapshotManager}, the current {@link
+ * FileMergingSnapshotManager} should not delete or count it if not owned.
+ */
+private boolean isOwned;
+
 public PhysicalFile(
 @Nullable FSDataOutputStream outputStream,
 Path filePath,
 @Nullable PhysicalFileDeleter deleter,
 CheckpointedStateScope scope) {
+this(outputStream, filePath, deleter, scope, true);
+}
+
+public PhysicalFile(
+@Nullable FSDataOutputStream outputStream,
+Path filePath,
+@Nullable PhysicalFileDeleter deleter,
+CheckpointedStateScope scope,
+boolean owned) {
 this.filePath = filePath;
 this.outputStream = outputStream;
 this.closed = outputStream == null;
-this.deleter = deleter;
+this.deleter = owned ? deleter : null;

Review Comment:
   ```suggestion
   this.deleter = deleter;
   ```
   
   I'd suggest keep this simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35343) NullPointerException in SourceReaderBase

2024-05-13 Thread zyh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zyh updated FLINK-35343:

Environment: 
* flink(1.17.2), local mode and deploy on k8s
 * doris-flink-connector-1.17(1.6.0)
 * Doris(2.1)

h2.  

  was:
flink(1.17.2), local mode and deploy on k8s

doris-flink-connector-1.17(1.6.0)

Doris(2.1)
h2.  


> NullPointerException in SourceReaderBase
> 
>
> Key: FLINK-35343
> URL: https://issues.apache.org/jira/browse/FLINK-35343
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.17.2
> Environment: * flink(1.17.2), local mode and deploy on k8s
>  * doris-flink-connector-1.17(1.6.0)
>  * Doris(2.1)
> h2.  
>Reporter: zyh
>Priority: Blocker
> Attachments: flinktask.png, servicelog.png
>
>
> h2. operation
> I used flink batch to read data from Doris and write to Doris. 
> The flink job include two source task, one table join task and one sink task.
>  
> h2. exception stack
> {code:java}
> Caused by: java.lang.NullPointerException    at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:194)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:208)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)    
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)    at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)    at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
> h2. other
> The problem only occur in flink local mode and deploy on k8s.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35343) NullPointerException in SourceReaderBase

2024-05-13 Thread zyh (Jira)
zyh created FLINK-35343:
---

 Summary: NullPointerException in SourceReaderBase
 Key: FLINK-35343
 URL: https://issues.apache.org/jira/browse/FLINK-35343
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.17.2
 Environment: flink(1.17.2), local mode and deploy on k8s

doris-flink-connector-1.17(1.6.0)

Doris(2.1)
h2.  
Reporter: zyh
 Attachments: flinktask.png, servicelog.png

h2. operation

I used flink batch to read data from Doris and write to Doris. 

The flink job include two source task, one table join task and one sink task.

 
h2. exception stack
{code:java}
Caused by: java.lang.NullPointerException    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:194)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:208)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)    at 
java.lang.Thread.run(Thread.java:748) {code}
 
h2. other

The problem only occur in flink local mode and deploy on k8s.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-13 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846141#comment-17846141
 ] 

Roman Boyko commented on FLINK-34380:
-

[~xu_shuai_] , [~xuyangzhong] , could you please take a look?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join

2024-05-13 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846140#comment-17846140
 ] 

Roman Boyko commented on FLINK-34694:
-

[~xu_shuai_] , please take a look.

> Delete num of associations for streaming outer join
> ---
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Roman Boyko
>Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png, 
> image-2024-04-26-16-55-19-800.png, image-2024-04-26-16-55-56-994.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35293) FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-13 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846137#comment-17846137
 ] 

Zhu Zhu commented on FLINK-35293:
-

master: ddb5a5355f9aca3d223f1fff6581d83dd317c2de

> FLIP-445: Support dynamic parallelism inference for HiveSource
> --
>
> Key: FLINK-35293
> URL: https://issues.apache.org/jira/browse/FLINK-35293
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.20.0
>Reporter: xingbe
>Assignee: xingbe
>Priority: Major
>  Labels: pull-request-available
>
> [FLIP-379|https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs]
>  introduces dynamic source parallelism inference, which, compared to static 
> inference, utilizes runtime information to more accurately determine the 
> source parallelism. The FileSource already possesses the capability for 
> dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP 
> plans to implement the dynamic parallelism inference interface for 
> HiveSource, and also switches the default static parallelism inference to 
> dynamic parallelism inference.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]

2024-05-13 Thread via GitHub


zhuzhurk merged PR #24764:
URL: https://github.com/apache/flink/pull/24764


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-13 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846135#comment-17846135
 ] 

Feng Jin commented on FLINK-35342:
--

[~rskraba] {color:#00}Thank you for reporting this issue, I will follow up 
to resolve it.{color}

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846129#comment-17846129
 ] 

Hang Ruan commented on FLINK-35109:
---

[~fpaul] Thanks for your quick reply. I will work on it later.

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Hang Ruan
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35302) Flink REST server throws exception on unknown fields in RequestBody

2024-05-13 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-35302:

Fix Version/s: 1.20.0
   (was: 1.19.1)

> Flink REST server throws exception on unknown fields in RequestBody
> ---
>
> Key: FLINK-35302
> URL: https://issues.apache.org/jira/browse/FLINK-35302
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Juntao Hu
>Assignee: Juntao Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> As 
> [FLIP-401|https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance]
>  and FLINK-33268 mentioned, when an old version REST client receives response 
> from a new version REST server, with strict JSON mapper, the client will 
> throw exceptions on newly added fields, which is not convenient for 
> situations where a centralized client deals with REST servers of different 
> versions (e.g. k8s operator).
> But this incompatibility can also happens at server side, when a new version 
> REST client sends requests to an old version REST server with additional 
> fields. Making server flexible with unknown fields can save clients from 
> backward compatibility code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall… [flink]

2024-05-13 Thread via GitHub


JunRuiLee commented on PR #24736:
URL: https://github.com/apache/flink/pull/24736#issuecomment-2109128178

   > > Thanks, @venkata91, for your contribution! After reviewing this PR, I'm 
concerned that it entirely removes limit that source parallelism should lower 
than source jobVertex's max parallelism. And I think the goal of this pr is 
ensure source parallelism isn't limited by config option 
execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects 
the max parallelism of source jobVertex.
   > > WDYT?
   > 
   > I think that makes sense. Basically what you're saying is if `source's max 
parallelism` is determined by the `source` itself which is < 
`default-source-parallelism` config, we should cap it by the `source computed 
max parallelism` correct? If so, I agree with that.
   
   Yes, that's correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846007#comment-17846007
 ] 

Ryan Skraba commented on FLINK-35342:
-

[~hackergin] I found this failure in this new test, from a few days ago.  Do 
you think it's likely to happen again?

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-13 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-35342:

Affects Version/s: 1.20.0

> MaterializedTableStatementITCase test can check for wrong status
> 
>
> Key: FLINK-35342
> URL: https://issues.apache.org/jira/browse/FLINK-35342
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> * 1.20 AdaptiveScheduler / Test (module: table) 
> https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
>  
> It looks like 
> {{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}}
>  can be flaky, where the expected status is not yet RUNNING:
> {code}
> Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
> Error: 03:24:03 03:24:03.902 [ERROR] 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
>  RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
> May 13 03:24:03 org.opentest4j.AssertionFailedError: 
> May 13 03:24:03 
> May 13 03:24:03 expected: "RUNNING"
> May 13 03:24:03  but was: "CREATED"
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> May 13 03:24:03   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> May 13 03:24:03   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> May 13 03:24:03   at 
> org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
> May 13 03:24:03   at java.lang.reflect.Method.invoke(Method.java:498)
> May 13 03:24:03   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> May 13 03:24:03   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> May 13 03:24:03 
> May 13 03:24:04 03:24:04.270 [INFO] 
> May 13 03:24:04 03:24:04.270 [INFO] Results:
> May 13 03:24:04 03:24:04.270 [INFO] 
> Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
> Error: 03:24:04 03:24:04.271 [ERROR]   
> MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650
>  
> May 13 03:24:04 expected: "RUNNING"
> May 13 03:24:04  but was: "CREATED"
> May 13 03:24:04 03:24:04.271 [INFO] 
> Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
> Skipped: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35342) MaterializedTableStatementITCase test can check for wrong status

2024-05-13 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35342:
---

 Summary: MaterializedTableStatementITCase test can check for wrong 
status
 Key: FLINK-35342
 URL: https://issues.apache.org/jira/browse/FLINK-35342
 Project: Flink
  Issue Type: Bug
Reporter: Ryan Skraba


* 1.20 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/9056197319/job/24879135605#step:10:12490
 
It looks like 
{{MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume}} 
can be flaky, where the expected status is not yet RUNNING:

{code}
Error: 03:24:03 03:24:03.902 [ERROR] Tests run: 6, Failures: 1, Errors: 0, 
Skipped: 0, Time elapsed: 26.78 s <<< FAILURE! -- in 
org.apache.flink.table.gateway.service.MaterializedTableStatementITCase
Error: 03:24:03 03:24:03.902 [ERROR] 
org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(Path,
 RestClusterClient) -- Time elapsed: 3.850 s <<< FAILURE!
May 13 03:24:03 org.opentest4j.AssertionFailedError: 
May 13 03:24:03 
May 13 03:24:03 expected: "RUNNING"
May 13 03:24:03  but was: "CREATED"
May 13 03:24:03 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
May 13 03:24:03 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
May 13 03:24:03 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
May 13 03:24:03 at 
org.apache.flink.table.gateway.service.MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume(MaterializedTableStatementITCase.java:650)
May 13 03:24:03 at java.lang.reflect.Method.invoke(Method.java:498)
May 13 03:24:03 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
May 13 03:24:03 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
May 13 03:24:03 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
May 13 03:24:03 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
May 13 03:24:03 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
May 13 03:24:03 
May 13 03:24:04 03:24:04.270 [INFO] 
May 13 03:24:04 03:24:04.270 [INFO] Results:
May 13 03:24:04 03:24:04.270 [INFO] 
Error: 03:24:04 03:24:04.270 [ERROR] Failures: 
Error: 03:24:04 03:24:04.271 [ERROR]   
MaterializedTableStatementITCase.testAlterMaterializedTableSuspendAndResume:650 
May 13 03:24:04 expected: "RUNNING"
May 13 03:24:04  but was: "CREATED"
May 13 03:24:04 03:24:04.271 [INFO] 
Error: 03:24:04 03:24:04.271 [ERROR] Tests run: 82, Failures: 1, Errors: 0, 
Skipped: 0
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish [flink]

2024-05-13 Thread via GitHub


elon-X commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1598735613


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
 // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
 // the period task if it throws an exception).
 for (Integer subtaskId : subTaskIds) {
-context.sendEventToSourceOperatorIfTaskReady(
-subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+// when subtask have been finished, do not send event.
+if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   Hi @1996fanrui , 
   First of all, thank you for your detailed review and response. I would like 
to share my thoughts:
   
   1.Regarding the SourceOperator, initially, I considered the issue of code 
positioning. I noticed that before switching to the 
`DataInputStatus.END_OF_INPUT` state, the SourceOperator receives a 
`NoMoreSplitsEvent`,  and the SourceCoordinator also needs to determine whether 
the subtask has finished or not. Therefore, I chose to send the maximum 
timestamp within the `if (event instanceof NoMoreSplitsEvent) { } ` I believe 
these two locations should be equivalent.
   
   2.On the SourceCoordinator side, if a task has finished, the 
`context.sendEventToSourceOperatorIfTaskReady` code still sends events to that 
subtask. In the code provided by Gyula, if the response result from 
`gateway.sendEvent(event).get()` is obtained, the following exception occurs:
   `Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: 
"Source: Sequence Source -> Filter -> Sink: Print to Std. Out (1/2)" is not 
running, but in state FINISHED`
   
   Therefore, I added the condition `!context.hasNoMoreSplits(subtaskId)`. I 
also tested the scenario "some subtask doesn't have split when the parallelism 
of Kafka source is greater than Kafka partition," and indeed, no 
`NoMoreSplitsEvent` is sent.
   
   Please correct me if my understanding is wrong. I will modify the code based 
on the final discussion results and submit it~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-13 Thread via GitHub


empathy87 commented on PR #24659:
URL: https://github.com/apache/flink/pull/24659#issuecomment-2108124573

   > @empathy87 please let me know once backports are ready, then we can merge 
all of them
   
   @snuyanzin, backport PRs are done, CI succeeded.
   https://github.com/apache/flink/pull/24778
   https://github.com/apache/flink/pull/24779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall… [flink]

2024-05-13 Thread via GitHub


venkata91 commented on PR #24736:
URL: https://github.com/apache/flink/pull/24736#issuecomment-2108098806

   > Thanks, @venkata91, for your contribution! After reviewing this PR, I'm 
concerned that it entirely removes limit that source parallelism should lower 
than source jobVertex's max parallelism. And I think the goal of this pr is 
ensure source parallelism isn't limited by config option 
execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects 
the max parallelism of source jobVertex.
   > 
   > WDYT?
   
   I think that makes sense. Basically what you're saying is if `source's max 
parallelism` is determined by the `source` itself which is < 
`default-source-parallelism` config, we should cap it by the `source computed 
max parallelism` correct? If so, I agree with that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35341) Retraction stop working with Clock dependent function in Filter

2024-05-13 Thread Lim Qing Wei (Jira)
Lim Qing Wei created FLINK-35341:


 Summary: Retraction stop working with Clock dependent function in 
Filter
 Key: FLINK-35341
 URL: https://issues.apache.org/jira/browse/FLINK-35341
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.17.1
Reporter: Lim Qing Wei


 

Say we have a Flink SQL view where
 # we use clock dependent function like `UNIX_TIMESTAMP()` in query filter, eg. 
WHERE clause, eg. table.timestamp < UNIX_TIMESTAMP()
 # source record is retracted at a time where the filter is evaluated as false

we expect a retraction is produced from the view, but in practice nothing 
happen.

 

We are using kafka as a source, here's a small snippet that shows the problem.

 
{code:java}
CREATE TEMPORARY VIEW my_view AS
SELECT key,
someData,
expiry
FROM upstream 
WHERE expiry > UNIX_TIMESTAMP();


select * from my_view where key = 5574332;{code}
 

 

The actual query is a bit more complicated but this simplified one should 
illustrate the issue. Below is the event happen in chronological order:

 
 # Run this query as a stream
 # Create a record in upstream where key = 5574332, and expiry to be in 3 
minutes into the future.
 # Observe insertion of the record, as expected
 # Wait for 3 minutes
 # Now the record should expired, but given there's no update, there's no 
change to the stream output just yet
 # Delete the upstream record (using tombstone in kafka)
 # Observe no change in the stream output, but we are expecting retraction(aka 
deletion)

 

Is this a known issue? I've search Jira but could find any, I observed this in 
1.15 until 1.17, havent tested with 1.18 and above though



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-05-13 Thread via GitHub


vikramsinghchandel commented on PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#issuecomment-2107813052

   any updates here? This is a feature that is already available on Flink EMR 
would be great to have it here too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35335) StateCheckpointedITCase failed fatally with 127 exit code

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845948#comment-17845948
 ] 

Ryan Skraba commented on FLINK-35335:
-

* 1.19 test_cron_adaptive_scheduler tests 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59499=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=8379

> StateCheckpointedITCase failed fatally with 127 exit code
> -
>
> Key: FLINK-35335
> URL: https://issues.apache.org/jira/browse/FLINK-35335
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.19.1
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59499=logs=8fd9202e-fd17-5b26-353c-ac1ff76c8f28=ea7cf968-e585-52cb-e0fc-f48de023a7ca=8379
> {code}
> May 13 01:50:22 01:50:22.272 [INFO] Tests run: 6, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 30.03 s -- in 
> org.apache.flink.test.streaming.runtime.CacheITCase
> May 13 01:50:23 01:50:23.142 [INFO] Tests run: 1, Failures: 0, Errors: 0, 
> Skipped: 0, Time elapsed: 5.234 s -- in 
> org.apache.flink.test.streaming.experimental.CollectITCase
> May 13 01:50:23 01:50:23.611 [INFO] 
> May 13 01:50:23 01:50:23.611 [INFO] Results:
> May 13 01:50:23 01:50:23.611 [INFO] 
> May 13 01:50:23 01:50:23.611 [WARNING] Tests run: 1960, Failures: 0, Errors: 
> 0, Skipped: 25
> May 13 01:50:23 01:50:23.611 [INFO] 
> May 13 01:50:23 01:50:23.674 [INFO] 
> 
> May 13 01:50:23 01:50:23.674 [INFO] BUILD FAILURE
> May 13 01:50:23 01:50:23.674 [INFO] 
> 
> May 13 01:50:23 01:50:23.676 [INFO] Total time:  41:24 min
> May 13 01:50:23 01:50:23.677 [INFO] Finished at: 2024-05-13T01:50:23Z
> May 13 01:50:23 01:50:23.677 [INFO] 
> 
> May 13 01:50:23 01:50:23.677 [WARNING] The requested profile 
> "skip-webui-build" could not be activated because it does not exist.
> May 13 01:50:23 01:50:23.678 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.2.2:test (integration-tests) 
> on project flink-tests: 
> May 13 01:50:23 01:50:23.678 [ERROR] 
> May 13 01:50:23 01:50:23.678 [ERROR] Please refer to 
> /__w/2/s/flink-tests/target/surefire-reports for the individual test results.
> May 13 01:50:23 01:50:23.678 [ERROR] Please refer to dump files (if any 
> exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
> May 13 01:50:23 01:50:23.678 [ERROR] ExecutionException The forked VM 
> terminated without properly saying goodbye. VM crash or System.exit called?
> May 13 01:50:23 01:50:23.678 [ERROR] Command was /bin/sh -c cd 
> '/__w/2/s/flink-tests' && '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' 
> '-XX:+UseG1GC' '-Xms256m' '-XX:+IgnoreUnrecognizedVMOptions' 
> '--add-opens=java.base/java.util=ALL-UNNAMED' 
> '--add-opens=java.base/java.io=ALL-UNNAMED' '-Xmx1536m' '-jar' 
> '/__w/2/s/flink-tests/target/surefire/surefirebooter-20240513010926195_686.jar'
>  '/__w/2/s/flink-tests/target/surefire' '2024-05-13T01-09-20_665-jvmRun1' 
> 'surefire-20240513010926195_684tmp' 'surefire_206-20240513010926195_685tmp'
> May 13 01:50:23 01:50:23.679 [ERROR] Error occurred in starting fork, check 
> output in log
> May 13 01:50:23 01:50:23.679 [ERROR] Process Exit Code: 127
> May 13 01:50:23 01:50:23.679 [ERROR] Crashed tests:
> May 13 01:50:23 01:50:23.679 [ERROR] 
> org.apache.flink.test.checkpointing.StateCheckpointedITCase
> May 13 01:50:23 01:50:23.679 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> May 13 01:50:23 01:50:23.679 [ERROR] Command was /bin/sh -c cd 
> '/__w/2/s/flink-tests' && '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' 
> '-XX:+UseG1GC' '-Xms256m' '-XX:+IgnoreUnrecognizedVMOptions' 
> '--add-opens=java.base/java.util=ALL-UNNAMED' 
> '--add-opens=java.base/java.io=ALL-UNNAMED' '-Xmx1536m' '-jar' 
> '/__w/2/s/flink-tests/target/surefire/surefirebooter-20240513010926195_686.jar'
>  '/__w/2/s/flink-tests/target/surefire' '2024-05-13T01-09-20_665-jvmRun1' 
> 'surefire-20240513010926195_684tmp' 'surefire_206-20240513010926195_685tmp'
> May 13 01:50:23 01:50:23.679 [ERROR] Error occurred in starting fork, check 
> output in log
> May 13 01:50:23 01:50:23.679 [ERROR] Process Exit Code: 127
> May 13 01:50:23 01:50:23.679 [ERROR] Crashed tests:
> May 13 01:50:23 01:50:23.679 [ERROR] 
> org.apache.flink.test.checkpointing.StateCheckpointedITCase
> May 13 01:50:23 01:50:23.679 [ERROR]  at 
> 

[jira] [Commented] (FLINK-35254) build_wheels_on_macos failed

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845946#comment-17845946
 ] 

Ryan Skraba commented on FLINK-35254:
-

* 1.20 build_wheels_on_macos 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59476=logs=f73b5736-8355-5390-ec71-4dfdec0ce6c5=90f7230e-bf5a-531b-8566-ad48d3e03bbb=426


> build_wheels_on_macos failed
> 
>
> Key: FLINK-35254
> URL: https://issues.apache.org/jira/browse/FLINK-35254
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
>  ERROR: THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS FILE. If 
> you have updated the package versions, please update the hashes. Otherwise, 
> examine the package contents carefully; someone may have tampered with them.
>   unknown package:
>   Expected sha256 
> f12932e5a6feb5c58192209af1d2607d488cb1d404fbc038ac12ada60327fa34
>Got
> 1c61bf307881167fe169de79c02f46d16fc5cd35781e02a40bf1f13671cdc22c
>   
>   [end of output]
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59219=logs=f73b5736-8355-5390-ec71-4dfdec0ce6c5=90f7230e-bf5a-531b-8566-ad48d3e03bbb=288



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35284) Streaming File Sink end-to-end test times out

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845947#comment-17845947
 ] 

Ryan Skraba commented on FLINK-35284:
-

* 1.20 AdaptiveScheduler / E2E (group 2) 
https://github.com/apache/flink/actions/runs/9048112585/job/24860957143#step:14:3735
* 1.20 e2e_2_cron_adaptive_scheduler 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59498=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=4404

> Streaming File Sink end-to-end test times out
> -
>
> Key: FLINK-35284
> URL: https://issues.apache.org/jira/browse/FLINK-35284
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: test-stability
>
> 1.20 e2e_2_cron_adaptive_scheduler 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59303=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=3076
> {code}
> May 01 01:08:42 Test (pid: 127498) did not finish after 900 seconds.
> May 01 01:08:42 Printing Flink logs and killing it:
> {code}
> This looks like a consequence of hundreds of 
> {{RecipientUnreachableException}}s like: 
> {code}
> 2024-05-01 00:55:00,496 WARN  
> org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer 
> [] - Slot allocation for allocation 2ec550d8331cd53c32fd899e1e9a0fa5 for job 
> 5654b195450b352be998673f1637fc43 failed.
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
> not send message [RemoteRpcInvocation(TaskExecutorGateway.requestSlot(SlotID, 
> JobID, AllocationID, ResourceProfile, String, ResourceManagerId, Time))] from 
> sender [Actor[pekko://flink/temp/taskmanager_0$De]] to recipient 
> [Actor[pekko.ssl.tcp://flink@localhost:40665/user/rpc/taskmanager_0#-299862847]],
>  because the recipient is unreachable. This can either mean that the 
> recipient has been terminated or that the remote RpcService is currently not 
> reachable.
>   at 
> org.apache.flink.runtime.rpc.pekko.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>  ~[flink-rpc-akkafe85d469-8ced-4732-922e-62c82b554871.jar:1.20-SNAPSHOT]
>   at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
> ~[flink-rpc-akkafe85d469-8ced-4732-922e-62c82b554871.jar:1.20-SNAPSHOT]
>   at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
> ~[flink-rpc-akkafe85d469-8ced-4732-922e-62c82b554871.jar:1.20-SNAPSHOT]
>   at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> ~[flink-rpc-akkafe85d469-8ced-4732-922e-62c82b554871.jar:1.20-SNAPSHOT]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845945#comment-17845945
 ] 

Ryan Skraba commented on FLINK-35041:
-

Thanks so much!  I've verified that I can no longer reproduce this error by 
repeatedly running the entire package of tests.

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845943#comment-17845943
 ] 

Ryan Skraba commented on FLINK-35002:
-

* 1.18 AdaptiveScheduler / Compile 
https://github.com/apache/flink/commit/09f7b070989a906d777a000e6ec3d9b45e192a29/checks/24844337742/logs


> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35340) Flink Kubernetes Operator CRDs not applying correctly

2024-05-13 Thread Neil Prasad (Jira)
Neil Prasad created FLINK-35340:
---

 Summary: Flink Kubernetes Operator CRDs not applying correctly
 Key: FLINK-35340
 URL: https://issues.apache.org/jira/browse/FLINK-35340
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.8.0, kubernetes-operator-1.7.0
Reporter: Neil Prasad


>From version 1.7.0 onwards, CRDs include an additional parameter for 
>additionalPrinterColumns, "priority":


{code:java}
  - additionalPrinterColumns:
    - description: Last observed state of the job.
      jsonPath: .status.jobStatus.state
      name: Job Status
      priority: 0
      type: string
    - description: "Lifecycle state of the Flink resource (including being 
rolled\
        \ back, failed etc.)."
      jsonPath: .status.lifecycleState
      name: Lifecycle State
      priority: 0
      type: string {code}
When applying the CRDs initially or as an upgrade the CRD doesn't apply 
correctly and omits the newly added "priority" field. This is both tested on a 
fresh install via Minikube as well as an existing GKE deployment.

For what it's worth, the versions test on is k8s v1.29.3. When applying these 
CRDs, there is no error message that comes up and nothing appears in the api 
server logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845942#comment-17845942
 ] 

Ryan Skraba commented on FLINK-34227:
-

* 1.19 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/9056197345/job/24878932830#step:10:14273
* 1.18 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/9056197329/job/24879136968#step:10:11976


> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30644) ChangelogCompatibilityITCase.testRestore fails due to CheckpointCoordinator being shutdown

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845940#comment-17845940
 ] 

Ryan Skraba commented on FLINK-30644:
-

* 1.19 test_cron_jdk17 tests 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59477=logs=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3=712ade8c-ca16-5b76-3acd-14df33bc1cb1=8264


> ChangelogCompatibilityITCase.testRestore fails due to CheckpointCoordinator 
> being shutdown
> --
>
> Key: FLINK-30644
> URL: https://issues.apache.org/jira/browse/FLINK-30644
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / State Backends
>Affects Versions: 1.17.0, 1.19.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: auto-deprioritized-critical, test-stability
>
> We observe a build failure in {{ChangelogCompatibilityITCase.testRestore}} 
> due to the {{CheckpointCoordinator}} being shut down:
> {code:java}
> [...]
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: 
> CheckpointCoordinator shutdown.
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:544)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2140)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2127)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoints(CheckpointCoordinator.java:2004)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoints(CheckpointCoordinator.java:1987)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingAndQueuedCheckpoints(CheckpointCoordinator.java:2183)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.shutdown(CheckpointCoordinator.java:426)
> Jan 12 02:37:37   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.onTerminalState(DefaultExecutionGraph.java:1329)
> [...]{code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44731=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=b78d9d30-509a-5cea-1fef-db7abaa325ae=9255



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845939#comment-17845939
 ] 

Ryan Skraba commented on FLINK-28440:
-

* 1.18 Default (Java 8) / Test (module: tests) 
https://github.com/apache/flink/actions/runs/9029200531/job/24811449545#step:10:8625
* 1.19 test_cron_azure tests 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59499=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8120


> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.18.0, 1.19.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> stale-assigned, test-stability
> Fix For: 1.20.0
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> 

[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845938#comment-17845938
 ] 

Ryan Skraba commented on FLINK-26644:
-

* 1.18 Java 11 / Test (module: python) 
https://github.com/apache/flink/actions/runs/9040330854/job/24844520768#step:10:24270

> python 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies 
> failed on azure
> ---
>
> Key: FLINK-26644
> URL: https://issues.apache.org/jira/browse/FLINK-26644
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 
> === FAILURES 
> ===
> 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _
> 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = 
>   testMethod=test_generate_stream_graph_with_dependencies>
> 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def 
> test_generate_stream_graph_with_dependencies(self):
> 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = 
> os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir)
> 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = 
> os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
> 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with 
> open(python_file_path, 'w') as f:
> 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def 
> add_two(a):\nreturn a + 2")
> 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env
> 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 
> env.add_python_file(python_file_path)
> 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value):
> 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from 
> test_stream_dependency_manage_lib import add_two
> 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], 
> add_two(value[1])
> 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i):
> 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with 
> open("data/data.txt", 'r') as f:
> 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], 
> i[1] + int(f.read())
> 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = 
> env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1),
> 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24  
>  ('e', 2)],
> 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24  
> type_info=Types.ROW([Types.STRING(),
> 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24  
>  Types.INT()]))
> 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 
> from_collection_source.name("From Collection")
> 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = 
> from_collection_source.key_by(lambda x: x[1], key_type=Types.INT())
> 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = 
> keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3)
> 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = 
> plus_two_map_stream.map(add_from_file).name("Add From File Map")
> 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = 
> add_from_file_map.add_sink(self.test_sink).name("Test Sink")
> 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 
> test_stream_sink.set_parallelism(4)
> 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = 
> os.path.join(self.tempdir, "archive_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 
> os.mkdir(archive_dir_path)
> 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with 
> open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
> 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3")
> 2022-03-14T18:50:24.6875906Z Mar 14 

[jira] [Commented] (FLINK-23577) CoordinatedSourceRescaleITCase.testUpscaling fails with NoSuchFileException

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845937#comment-17845937
 ] 

Ryan Skraba commented on FLINK-23577:
-

Looks like this flaky test has made a reappearance!
* 1.19 test_cron_adaptive_scheduler connect 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59455=logs=93ebd72a-004d-5a68-6295-7ace4ad889cd=35e92294-2840-51f1-1753-ae015c24c41f=10519

> CoordinatedSourceRescaleITCase.testUpscaling fails with NoSuchFileException
> ---
>
> Key: FLINK-23577
> URL: https://issues.apache.org/jira/browse/FLINK-23577
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.4
>Reporter: Xintong Song
>Priority: Major
>  Labels: test-stability
> Fix For: 1.12.8
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21256=logs=298e20ef-7951-5965-0e79-ea664ddc435e=b4cd3436-dbe8-556d-3bca-42f92c3cbf2f=21306
> {code}
> [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.874 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase
> [ERROR] 
> testUpscaling(org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase)
>   Time elapsed: 5.32 s  <<< ERROR!
> java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
> /tmp/junit5156435599891303309/junit3268016245125781188/79604f102e69d25f3258a72a648dfdef/chk-8
>   at 
> java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
>   at 
> java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
>   at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
>   at 
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:558)
>   at 
> java.base/java.util.stream.ReferencePipeline.max(ReferencePipeline.java:594)
>   at 
> org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase.generateCheckpoint(CoordinatedSourceRescaleITCase.java:83)
>   at 
> org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase.testUpscaling(CoordinatedSourceRescaleITCase.java:70)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at 

[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-05-13 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845936#comment-17845936
 ] 

Ryan Skraba commented on FLINK-18476:
-

* 1.19 test_cron_hadoop313 misc 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59477=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=d04c9862-880c-52f5-574b-a7a79fef8e0f=22042

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35253][docs-zh]Translate "State Processor API "page into Chinese [flink]

2024-05-13 Thread via GitHub


flinkbot commented on PR #24780:
URL: https://github.com/apache/flink/pull/24780#issuecomment-2107606610

   
   ## CI report:
   
   * 557dcaa7bda5fd1b76e446f643012dc4d47a563c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35339) Compilation timeout while building flink-dist

2024-05-13 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35339:
---

 Summary: Compilation timeout while building flink-dist
 Key: FLINK-35339
 URL: https://issues.apache.org/jira/browse/FLINK-35339
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.19.1
Reporter: Ryan Skraba


* 1.19 Java 17 / Test (module: python) 
https://github.com/apache/flink/actions/runs/9040330904/job/24844527283#step:10:14325

The CI pipeline fails with:

{code}
May 11 02:44:25 Process exited with EXIT CODE: 143.
May 11 02:44:25 Trying to KILL watchdog (49546).
May 11 02:44:25 
==
May 11 02:44:25 Compilation failure detected, skipping test execution.
May 11 02:44:25 
==
{code}

It looks like this is due to a failed network connection while building 
src/assemblies/bin.xml :

{code}
May 11 02:44:25java.lang.Thread.State: RUNNABLE
May 11 02:44:25 at sun.nio.ch.Net.connect0(java.base@17.0.7/Native 
Method)
May 11 02:44:25 at sun.nio.ch.Net.connect(java.base@17.0.7/Net.java:579)
May 11 02:44:25 at sun.nio.ch.Net.connect(java.base@17.0.7/Net.java:568)
May 11 02:44:25 at 
sun.nio.ch.NioSocketImpl.connect(java.base@17.0.7/NioSocketImpl.java:588)
May 11 02:44:25 at 
java.net.SocksSocketImpl.connect(java.base@17.0.7/SocksSocketImpl.java:327)
May 11 02:44:25 at 
java.net.Socket.connect(java.base@17.0.7/Socket.java:633)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
May 11 02:44:25 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35253][docs-zh]Translate "State Processor API "page into Chinese [flink]

2024-05-13 Thread via GitHub


drymatini commented on PR #24780:
URL: https://github.com/apache/flink/pull/24780#issuecomment-2107594814

   @wuchong @reswqa Hi, I have finished the translation of the page.Could you 
please verify my pull request?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35253) Translate "State Processor API "page into Chinese

2024-05-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35253:
---
Labels: pull-request-available  (was: )

> Translate "State Processor API "page into Chinese
> -
>
> Key: FLINK-35253
> URL: https://issues.apache.org/jira/browse/FLINK-35253
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.19.0
>Reporter: Juan Zifeng
>Assignee: Juan Zifeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The links are 
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/libs/state_processor_api/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35253][docs-zh]Translate "State Processor API "page into Chinese [flink]

2024-05-13 Thread via GitHub


drymatini opened a new pull request, #24780:
URL: https://github.com/apache/flink/pull/24780

   ## What is the purpose of the change
   
   "State Processor API "page was newly released and it should be translated 
into Chineses
   
   
   ## Brief change log
   
   translate 'docs/content/docs/libs/state_processor_api.md' 
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35193][table] Support drop materialized table syntax and execution in continuous refresh mode [flink]

2024-05-13 Thread via GitHub


lsyldliu commented on code in PR #24777:
URL: https://github.com/apache/flink/pull/24777#discussion_r1598351419


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropMaterializedTable.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/** DROP MATERIALIZED TABLE DDL sql call. */
+public class SqlDropMaterializedTable extends SqlDrop {
+
+private static final SqlOperator OPERATOR =
+new SqlSpecialOperator("DROP MATERIALIZED TABLE", 
SqlKind.DROP_TABLE);
+
+private final SqlIdentifier tableIdentifier;
+
+public SqlDropMaterializedTable(
+SqlParserPos pos, SqlIdentifier tableIdentifier, boolean ifExists) 
{
+super(OPERATOR, pos, ifExists);
+this.tableIdentifier = tableIdentifier;
+}
+
+public String[] fullTableName() {
+return tableIdentifier.names.toArray(new String[0]);
+}
+
+public boolean getIfExists() {
+return this.ifExists;
+}
+
+@Override
+public List getOperandList() {
+return ImmutableNullableList.of(tableIdentifier);
+}
+
+@Override
+public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+writer.keyword("DROP");

Review Comment:
   ```suggestion
   writer.keyword("DROP MATERIALIZED TABLE");
   ```



##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -1801,6 +1801,23 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
 }
 }
 
+SqlDropMaterializedTable SqlDropMaterializedTable(Span s, boolean replace) :

Review Comment:
   We can return `SqlDrop` directly.



##
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl:
##
@@ -2427,6 +2444,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
 (
 drop = SqlDropCatalog(s, replace)
 |
+drop = SqlDropMaterializedTable(s, replace)

Review Comment:
   We should throw an exception if the user specifies the TEMPORARY keyword, 
similar to the `CREATE MATERIALIZED TABLE` process logic.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropMaterializedTableConverter.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+
+/** A converter for {@link SqlDropMaterializedTable}. */
+public class SqlDropMaterializedTableConverter
+implements SqlNodeConverter {
+@Override
+public Operation convertSqlNode(
+   

Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-13 Thread via GitHub


superdiaodiao commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2107498923

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-13 Thread via GitHub


flinkbot commented on PR #24779:
URL: https://github.com/apache/flink/pull/24779#issuecomment-2107496417

   
   ## CI report:
   
   * 87c26627c5431157f1e85d35b3ce466ff708ff40 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-13 Thread via GitHub


flinkbot commented on PR #24778:
URL: https://github.com/apache/flink/pull/24778#issuecomment-2107479208

   
   ## CI report:
   
   * d9dddcd7c6613169b95193b85fdefa29c726996b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-05-13 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845909#comment-17845909
 ] 

yazgoo commented on FLINK-35138:


Hello, do you have an update on this ticket ?
Looks like the vote was fine ?

Thanks !

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35168) Basic State Iterator for async processing

2024-05-13 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845908#comment-17845908
 ] 

Zakelly Lan commented on FLINK-35168:
-

Merged into master via 01586780

> Basic State Iterator for async processing
> -
>
> Key: FLINK-35168
> URL: https://issues.apache.org/jira/browse/FLINK-35168
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35168) Basic State Iterator for async processing

2024-05-13 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-35168.
-
Resolution: Fixed

> Basic State Iterator for async processing
> -
>
> Key: FLINK-35168
> URL: https://issues.apache.org/jira/browse/FLINK-35168
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-05-13 Thread via GitHub


Zakelly merged PR #24690:
URL: https://github.com/apache/flink/pull/24690


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35336) SQL failed to restore from savepoint after change in default-parallelism

2024-05-13 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35336:
---
Issue Type: New Feature  (was: Bug)

> SQL failed to restore from savepoint after change in default-parallelism
> 
>
> Key: FLINK-35336
> URL: https://issues.apache.org/jira/browse/FLINK-35336
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
> Environment: Flink SQL Client, Flink 1.18.1 on MacOS
>Reporter: Keith Lee
>Priority: Major
>
> After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
> observing the following exception on restoring job from savepoint with an 
> unmodified statement set. 
>  
> {quote}[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
> [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
>  Cannot map checkpoint/savepoint state for operator 
> 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
> {quote}
> When started without savepoints, the jobgraph differs for the jobs despite 
> identical statements being ran.
> There are 2 operators when default parallelism is 1.
> {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, 
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76]
> {quote}
> Three operators when default parallelism is 4.
> {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
> LocalWindowAggregate[90]) 
> B: Sink: end 
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94]
> {quote}
>  
> Notice that the operator 'Sink: end' is separated out when parallelism is set 
> to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
> difference between syntax tree, physical plan or execution plan.
> I have attempted various configurations in `table.optimizer.*`.
> Steps to reproduce
> {quote}SET 'table.exec.resource.default-parallelism' = '1';
> EXECUTE STATEMENT SET BEGIN 
>     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
> interaction_type, interaction_target, interaction_tags, event_date, 
> event_hour, event_time)
>     SELECT
>         user_id, 
>         user_session,
>         interaction_type,
>         interaction_target,
>         interaction_tags, 
>         DATE_FORMAT(event_time , '-MM-dd'),
>         DATE_FORMAT(event_time , 'HH'),
>         event_time 
>     FROM UserBehaviourKafkaSource 
>     WHERE 
>         interaction_result Like '%ERROR%'; 
>     INSERT INTO CampaignAggregationsJDBC 
>     SELECT 
>         CONCAT_WS('/', interaction_tags, interaction_result, 
> DATE_FORMAT(window_start, '-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
> '-MM-DD HH:mm:ss.SSS')) AS id, 
>         interaction_tags as campaign, 
>         interaction_result, 
>         COUNT(*) AS interaction_count, 
>         window_start, 
>         window_end 
>     FROM 
>         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
> INTERVAL '10' SECONDS)) 
>     GROUP BY window_start, window_end, interaction_tags, interaction_result; 
> END;
> STOP JOB '' WITH SAVEPOINT;
> SET 'execution.savepoint.path' = '//';
> SET 'table.exec.resource.default-parallelism' = '4';
> 
> {quote}
> DDLs
> {quote}– S3 Sink
> CREATE TABLE UserErrorExperienceS3Sink (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   event_date STRING,
>   event_hour STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> PARTITIONED BY (event_date, event_hour)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 's3:///userErrorExperience/',
>   'format' = 'json');
> – Kafka Source
> ADD JAR 
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> CREATE TABLE UserBehaviourKafkaSource (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   interaction_result STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
> WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behaviour',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'demoGroup',
>   

[jira] [Commented] (FLINK-35336) SQL failed to restore from savepoint after change in default-parallelism

2024-05-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845901#comment-17845901
 ] 

Martijn Visser commented on FLINK-35336:


That's not a bug: changing the parallelism is generates a new jobgraph and that 
will lead to state incompatibility, as outlined in 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#table-api--sql

See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489 
for more details as well

> SQL failed to restore from savepoint after change in default-parallelism
> 
>
> Key: FLINK-35336
> URL: https://issues.apache.org/jira/browse/FLINK-35336
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
> Environment: Flink SQL Client, Flink 1.18.1 on MacOS
>Reporter: Keith Lee
>Priority: Major
>
> After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
> observing the following exception on restoring job from savepoint with an 
> unmodified statement set. 
>  
> {quote}[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
> [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
>  Cannot map checkpoint/savepoint state for operator 
> 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
> {quote}
> When started without savepoints, the jobgraph differs for the jobs despite 
> identical statements being ran.
> There are 2 operators when default parallelism is 1.
> {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, 
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76]
> {quote}
> Three operators when default parallelism is 4.
> {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
> LocalWindowAggregate[90]) 
> B: Sink: end 
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94]
> {quote}
>  
> Notice that the operator 'Sink: end' is separated out when parallelism is set 
> to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
> difference between syntax tree, physical plan or execution plan.
> I have attempted various configurations in `table.optimizer.*`.
> Steps to reproduce
> {quote}SET 'table.exec.resource.default-parallelism' = '1';
> EXECUTE STATEMENT SET BEGIN 
>     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
> interaction_type, interaction_target, interaction_tags, event_date, 
> event_hour, event_time)
>     SELECT
>         user_id, 
>         user_session,
>         interaction_type,
>         interaction_target,
>         interaction_tags, 
>         DATE_FORMAT(event_time , '-MM-dd'),
>         DATE_FORMAT(event_time , 'HH'),
>         event_time 
>     FROM UserBehaviourKafkaSource 
>     WHERE 
>         interaction_result Like '%ERROR%'; 
>     INSERT INTO CampaignAggregationsJDBC 
>     SELECT 
>         CONCAT_WS('/', interaction_tags, interaction_result, 
> DATE_FORMAT(window_start, '-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
> '-MM-DD HH:mm:ss.SSS')) AS id, 
>         interaction_tags as campaign, 
>         interaction_result, 
>         COUNT(*) AS interaction_count, 
>         window_start, 
>         window_end 
>     FROM 
>         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
> INTERVAL '10' SECONDS)) 
>     GROUP BY window_start, window_end, interaction_tags, interaction_result; 
> END;
> STOP JOB '' WITH SAVEPOINT;
> SET 'execution.savepoint.path' = '//';
> SET 'table.exec.resource.default-parallelism' = '4';
> 
> {quote}
> DDLs
> {quote}– S3 Sink
> CREATE TABLE UserErrorExperienceS3Sink (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   event_date STRING,
>   event_hour STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> PARTITIONED BY (event_date, event_hour)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 's3:///userErrorExperience/',
>   'format' = 'json');
> – Kafka Source
> ADD JAR 
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> CREATE TABLE UserBehaviourKafkaSource (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   

Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-05-13 Thread via GitHub


superdiaodiao commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2107400121

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-16528) Support Limit push down for Kafka streaming sources

2024-05-13 Thread Lorenzo Affetti (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845894#comment-17845894
 ] 

Lorenzo Affetti commented on FLINK-16528:
-

[~jark] is this issue still valuable and open currently?

If so, I would be glad to take it

> Support Limit push down for Kafka streaming sources
> ---
>
> Key: FLINK-16528
> URL: https://issues.apache.org/jira/browse/FLINK-16528
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Ecosystem
>Reporter: Jark Wu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> usability
>
> Currently, a limit query {{SELECT * FROM kafka LIMIT 10}} will be translated 
> into TopN operator and will scan the full data in the source. However, 
> {{LIMIT}} is a very useful feature in SQL CLI to explore data in the source. 
> It doesn't make sense it never stop. 
> We can support such case in streaming mode (ignore the text format):
> {code}
> flink > SELECT * FROM kafka LIMIT 10;
>  kafka_key  |user_name| lang |   created_at
> +-+--+-
>  494227746231685121 | burncaniff  | en   | 2014-07-29 14:07:31.000
>  494227746214535169 | gu8tn   | ja   | 2014-07-29 14:07:31.000
>  494227746219126785 | pequitamedicen  | es   | 2014-07-29 14:07:31.000
>  494227746201931777 | josnyS  | ht   | 2014-07-29 14:07:31.000
>  494227746219110401 | Cafe510 | en   | 2014-07-29 14:07:31.000
>  494227746210332673 | Da_JuanAnd_Only | en   | 2014-07-29 14:07:31.000
>  494227746193956865 | Smile_Kidrauhl6 | pt   | 2014-07-29 14:07:31.000
>  494227750426017793 | CashforeverCD   | en   | 2014-07-29 14:07:32.000
>  494227750396653569 | FilmArsivimiz   | tr   | 2014-07-29 14:07:32.000
>  494227750388256769 | jmolas  | es   | 2014-07-29 14:07:32.000
> (10 rows)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35193][table] Support drop materialized table syntax and execution in continuous refresh mode [flink]

2024-05-13 Thread via GitHub


flinkbot commented on PR #24777:
URL: https://github.com/apache/flink/pull/24777#issuecomment-2107395878

   
   ## CI report:
   
   * 67ef2ca47615d5b36424af7bc12712071369e358 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-05-13 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2107381572

   > @Zakelly Thanks for the PR, I have a question about the overall design. It 
seems that an iterator can actually be split into several executions. If other 
UPDATE are inserted in the middle, will it affect the final result of the 
iterator?
   
   Ideally any UPDATE happens in middle of iterating won't affect the result of 
iteration. This should be ensured by the `StateBackend`, which is beyond the 
story of this PR. The necessary information for `StateBackend`s to maintain the 
iterator is relayed back via the `nextPayloadForContinuousLoading()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35126) Improve checkpoint progress health check config and enable by default

2024-05-13 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-35126:
--

Assignee: Gyula Fora

> Improve checkpoint progress health check config and enable by default
> -
>
> Key: FLINK-35126
> URL: https://issues.apache.org/jira/browse/FLINK-35126
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> Currently the checkpoint progress health check window is configurable by 
> Duration. This makes it hard to enable by default as the sensible interval 
> depends on the checkpoint interval.
> We should rework the config and add an alternative checkpoint interval 
> multiplier based config which could be set by default to 3 (default window is 
> 3x checkpoint interval )
> If checkpointing is not enabled in config the health check would be disabled 
> of course.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Description: 
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with *[Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
 which among other things mean the Flink Operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image

  was:
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 which among other things mean the Flink Operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image


> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with *[Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
>  which among other things mean the Flink Operator pod is configured to use 
> *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
> write to our plugins directory.
> 

[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Description: 
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 which among other things mean the Flink Operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image

  was:
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with ** [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 **  ** which among other things mean the Flink Operator pod is configured to 
use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed 
to write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image


> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with [Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
>  which among other things mean the Flink Operator pod is configured to use 
> *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
> write to our plugins 

[jira] [Updated] (FLINK-5193) Recovering all jobs fails completely if a single recovery fails

2024-05-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-5193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-5193:
--
Labels: pull-request-available  (was: )

> Recovering all jobs fails completely if a single recovery fails
> ---
>
> Key: FLINK-5193
> URL: https://issues.apache.org/jira/browse/FLINK-5193
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.1.3, 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.1.4, 1.2.0
>
>
> In HA case where the {{JobManager}} tries to recover all submitted job 
> graphs, e.g. when regaining leadership, it can happen that none of the 
> submitted jobs are recovered if a single recovery fails. Instead of failing 
> the complete recovery procedure, the {{JobManager}} should still try to 
> recover the remaining (non-failing) jobs and print a proper error message for 
> the failed recoveries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Description: 
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with ** [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 **  ** which among other things mean the Flink Operator pod is configured to 
use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed 
to write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image

  was:
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with  **  *[Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
  ** which among other things mean the Flink Operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image


> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with ** [Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
>  **  ** which among other things mean the Flink Operator pod is configured to 
> use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not 
> allowed to write to 

[PR] [FLINK-5193][table] Support drop materialized table syntax and execution in continuous refresh mode [flink]

2024-05-13 Thread via GitHub


hackergin opened a new pull request, #24777:
URL: https://github.com/apache/flink/pull/24777

   ## What is the purpose of the change
   
   *Support drop materialized table syntax and execution in continuous refresh 
mode*
   
   
   ## Brief change log
   
 - *Support parser drop materialized table syntax in flink-sql-parser 
module*
 - *Add DropMaterializedTableConverter for convert drop materialized table 
node to operations*
 - *Support execution of drop materialized table*
   
   
   ## Verifying this change
-  *Added integration test for create and drop materialized table*
 - *Add drop table operation in MaterializedTableStatementITCase after 
every test case finished*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (will be added in a separated pr)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Description: 
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with  **  *[Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
  ** which among other things mean the Flink Operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image

  was:
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with * ** [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 **  ** which among other things mean the Flink Operator pod is configured to 
use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed 
to write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image


> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with  **  *[Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
>   ** which among other things mean the Flink Operator pod is configured to 
> use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not 
> allowed to write 

[jira] [Commented] (FLINK-35337) Keep up with the latest version of tikv client

2024-05-13 Thread ouyangwulin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845887#comment-17845887
 ] 

ouyangwulin commented on FLINK-35337:
-

[~Leonard], Our production environment has been updated  tikv version 6.5.4,Can 
we update it to new version.

 

> Keep up with the latest version of tikv client
> --
>
> Key: FLINK-35337
> URL: https://issues.apache.org/jira/browse/FLINK-35337
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: ouyangwulin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Description: 
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with * ** [Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
 **  ** which among other things mean the Flink Operator pod is configured to 
use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed 
to write to our plugins directory.

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart.

We have tested that it also works if we build our own image where we add the 
plugin, but we need to deploy the operator in different clusters with different 
requirements for filesystems so we would have to create a new image for each 
filesystem as well as updating all our own images every time there is an update 
to the official Flink Operator image

  was:
[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with ** *[Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
 ** which among other things mean the flink operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

 

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

 

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart


> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with * ** [Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]
>  **  ** which among other things mean the Flink Operator pod is configured to 
> use *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not 
> allowed to write to our plugins directory.
> We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
> */opt/flink/plugins* which would allow us to write to it, but that overrides 
> all the pre-installed plugins. When all the pre-installed plugins are removed 
> before startup, the operator sees the directory for the plugin we are trying 
> to 

[jira] [Updated] (FLINK-35337) Keep up with the latest version of tikv client

2024-05-13 Thread ouyangwulin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ouyangwulin updated FLINK-35337:

Component/s: Flink CDC

> Keep up with the latest version of tikv client
> --
>
> Key: FLINK-35337
> URL: https://issues.apache.org/jira/browse/FLINK-35337
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: ouyangwulin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35337) Keep up with the latest version of tikv client

2024-05-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35337:
---
Labels: pull-request-available  (was: )

> Keep up with the latest version of tikv client
> --
>
> Key: FLINK-35337
> URL: https://issues.apache.org/jira/browse/FLINK-35337
> Project: Flink
>  Issue Type: Improvement
>Reporter: ouyangwulin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Thygesen updated FLINK-35338:

Affects Version/s: 1.8.0

> Enable FS Plugins as non-root
> -
>
> Key: FLINK-35338
> URL: https://issues.apache.org/jira/browse/FLINK-35338
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Rasmus Thygesen
>Priority: Not a Priority
>
> [This pull 
> request|https://github.com/apache/flink-kubernetes-operator/pull/609] was 
> made to allow enabling FS plugins on the Flink Kubernetes Operator which 
> allows reading a jar for a session job on various file systems. It normally 
> works well, but we are running our cluster with ** *[Restricted Pod 
> Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
>  ** which among other things mean the flink operator pod is configured to use 
> *readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
> write to our plugins directory.
>  
> We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
> */opt/flink/plugins* which would allow us to write to it, but that overrides 
> all the pre-installed plugins. When all the pre-installed plugins are removed 
> before startup, the operator sees the directory for the plugin we are trying 
> to install, but does not find a jar file inside the directory and therefore 
> complains. We think that when the pre-installed plugins are there, the 
> operator takes a bit longer before it starts reading the new plugin and 
> therefore there is enough time to download the new plugin with curl.
>  
> We are open to suggestions for how we can solve this issue while keeping 
> *readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a 
> solution where we mount a volume and download all the pre-installed plugins 
> as well as any extra plugins we need through an init container and we propose 
> a new value to the Flink Operator Helm chart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35338) Enable FS Plugins as non-root

2024-05-13 Thread Rasmus Thygesen (Jira)
Rasmus Thygesen created FLINK-35338:
---

 Summary: Enable FS Plugins as non-root
 Key: FLINK-35338
 URL: https://issues.apache.org/jira/browse/FLINK-35338
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Rasmus Thygesen


[This pull 
request|https://github.com/apache/flink-kubernetes-operator/pull/609] was made 
to allow enabling FS plugins on the Flink Kubernetes Operator which allows 
reading a jar for a session job on various file systems. It normally works 
well, but we are running our cluster with ** *[Restricted Pod 
Security|https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted]*
 ** which among other things mean the flink operator pod is configured to use 
*readOnlyRootFilesystem* and *runAsNonRoot* which means we are not allowed to 
write to our plugins directory.

 

We have tried using *operatorVolumes* and *operatorVolumeMounts* to mount 
*/opt/flink/plugins* which would allow us to write to it, but that overrides 
all the pre-installed plugins. When all the pre-installed plugins are removed 
before startup, the operator sees the directory for the plugin we are trying to 
install, but does not find a jar file inside the directory and therefore 
complains. We think that when the pre-installed plugins are there, the operator 
takes a bit longer before it starts reading the new plugin and therefore there 
is enough time to download the new plugin with curl.

 

We are open to suggestions for how we can solve this issue while keeping 
*readOnlyRootFilesystem* and {*}runAsNonRoot{*}. We are considering a solution 
where we mount a volume and download all the pre-installed plugins as well as 
any extra plugins we need through an init container and we propose a new value 
to the Flink Operator Helm chart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >