Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]

2024-04-30 Thread via GitHub


h12567 commented on PR #24750:
URL: https://github.com/apache/flink/pull/24750#issuecomment-2087873976

   May I ask how to find the list of issues/requests I can contribute to?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-35217:
---
Fix Version/s: 1.18.2
   1.19.1

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, 

Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]

2024-04-30 Thread via GitHub


StefanRRichter merged PR #24752:
URL: https://github.com/apache/flink/pull/24752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]

2024-04-30 Thread via GitHub


StefanRRichter merged PR #24751:
URL: https://github.com/apache/flink/pull/24751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


borislitvak commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1585425522


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link SnowflakeDialect}. */
+@Internal
+public class SnowflakeDialectFactory implements JdbcDialectFactory {
+@Override
+public boolean acceptsURL(String url) {
+return url.startsWith("jdbc:snowflake:");
+}
+
+@Override
+public JdbcDialect create() {

Review Comment:
   Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


borislitvak commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1585423831


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
+ * Snowflake.
+ */
+@Internal
+public class SnowflakeRowConverter extends 
AbstractPostgresCompatibleRowConverter {
+
+private static final long serialVersionUID = 1L;
+
+@Override
+public String converterName() {
+return "Snowflake";
+}
+
+// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
+public SnowflakeRowConverter(RowType rowType) {

Review Comment:
   
   
   
   > I am not sure exactly how Snowflake SQL compares to the other SQLS. It 
seems to be close to SQL server , which has some TINYINT logic in its converter 
- it looks like Snowflake supports this type - so you need to add this to the 
converter.
   > 
   > I also see 
https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server.
 Do any of these SQL quirks require converter changes?
   > 
   > I notice that Snowflake SQL uses double quotes, where as Flink SQL uses 
back ticks , it would be worth testing that quotes work for Snowflake.
   
   https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
   
   This code was tested with Flink, so whatever Flink does should work. I've 
provided an example in the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35279) Support "last-state" upgrade mode for FlinkSessionJob

2024-04-30 Thread Alan Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alan Zhang updated FLINK-35279:
---
Description: 
The "last-state" upgrade mode is only supported for Flink application mode 
today[1], we should provide a consistent / similar user experience in Flink 
session mode.

[1] 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades]
{code:java}
Last state upgrade mode is currently only supported for FlinkDeployments. {code}

  was:
The "last-state" upgrade mode is only supported for Flink application mode 
today[1], we should provide a similar user experience in session mode.

 

[[1]https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades]
{code:java}
Last state upgrade mode is currently only supported for FlinkDeployments. {code}


> Support "last-state" upgrade mode for FlinkSessionJob 
> --
>
> Key: FLINK-35279
> URL: https://issues.apache.org/jira/browse/FLINK-35279
> Project: Flink
>  Issue Type: New Feature
>Reporter: Alan Zhang
>Priority: Major
>
> The "last-state" upgrade mode is only supported for Flink application mode 
> today[1], we should provide a consistent / similar user experience in Flink 
> session mode.
> [1] 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades]
> {code:java}
> Last state upgrade mode is currently only supported for FlinkDeployments. 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35279) Support "last-state" upgrade mode for FlinkSessionJob

2024-04-30 Thread Alan Zhang (Jira)
Alan Zhang created FLINK-35279:
--

 Summary: Support "last-state" upgrade mode for FlinkSessionJob 
 Key: FLINK-35279
 URL: https://issues.apache.org/jira/browse/FLINK-35279
 Project: Flink
  Issue Type: New Feature
Reporter: Alan Zhang


The "last-state" upgrade mode is only supported for Flink application mode 
today[1], we should provide a similar user experience in session mode.

 

[[1]https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades]
{code:java}
Last state upgrade mode is currently only supported for FlinkDeployments. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585113359


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Just checked in the operator logs as well, to potentially rule out a prom 
translation issue. I only see `NaN` for all three metrics in the logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34645) StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount fails

2024-04-30 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842430#comment-17842430
 ] 

Ryan Skraba commented on FLINK-34645:
-

1.18 Java 11 / Test (module: misc) 
https://github.com/apache/flink/actions/runs/8872328847/job/24356773170#step:10:21780

Again a slightly different output was received, but in the same test:
{code}
Apr 29 02:31:31 Expected size: 6 but was: 4 in:
Apr 29 02:31:31 [Record @ (undef) : 
+I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
Apr 29 02:31:31 Record @ (undef) : 
+I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
Apr 29 02:31:31 Record @ (undef) : 
+I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15),
Apr 29 02:31:31 Record @ (undef) : 
+I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)]
Apr 29 02:31:31 at 
org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:110)
Apr 29 02:31:31 at 
org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:70)
Apr 29 02:31:31 at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase.assertOutputEquals(ArrowPythonAggregateFunctionOperatorTestBase.java:62)
Apr 29 02:31:31 at 
org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:209)
{code}

> StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount
>  fails
> 
>
> Key: FLINK-34645
> URL: https://issues.apache.org/jira/browse/FLINK-34645
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> {code}
> Error: 02:27:17 02:27:17.025 [ERROR] Tests run: 3, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 0.658 s <<< FAILURE! - in 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
> Error: 02:27:17 02:27:17.025 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount
>   Time elapsed: 0.3 s  <<< FAILURE!
> Mar 09 02:27:17 java.lang.AssertionError: 
> Mar 09 02:27:17 
> Mar 09 02:27:17 Expected size: 8 but was: 6 in:
> Mar 09 02:27:17 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c2,3,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c2,3,1970-01-01T00:00,1970-01-01T00:00:10),
> Mar 09 02:27:17 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Mar 09 02:27:17 Watermark @ 1,
> Mar 09 02:27:17 Watermark @ 2]
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:110)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:70)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase.assertOutputEquals(ArrowPythonAggregateFunctionOperatorTestBase.java:62)
> Mar 09 02:27:17   at 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java:326)
> Mar 09 02:27:17   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Great, thank you!
   
   Yes pendingRecords is present in all jobs and has a value (fluctuating, of 
course). I have never seen the operator's metric 
`flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be 
anything other than `NaN`, though. I'm observing it via prometheus, so maybe 
it's just a bug in the translation layer.
   
   edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`...
   
   (also, if this conversation is out of scope I'd be happy to move it to 
somewhere that is less tangential!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Great, thank you!
   
   Yes pendingRecords is present in all jobs and has a value (fluctuating, of 
course). I have never seen the operator's metric 
`flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be 
anything other than NaN, though. I'm observing it via prometheus, so maybe it's 
just a bug in the translation layer.
   
   edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`...
   
   (also, if this conversation is out of scope I'd be happy to move it to 
somewhere that is less tangential!)



##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Thanks, that makes a lot of sense! Is catch up status determined by literal 
timestamps compared against the catch up duration? eg if a record was placed in 
kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, 
or are we still 10m behind? or is catch up determined by throughput numbers? 
just trying to get a better sense of "catch up" statistics!
   
   Perhaps our problem is that lag and source_date_rate -for every single job 
tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`) -  is `NaN`. At 
least according to the exposed operator metrics themselves. If the operator 
can't see the lag then maybe it can't make an informed decision? I'm wondering 
if this is a bug on our configuration or maybe I'm just way off base. I should 
expect to see values for `LAG_Current` and `SOURCE_DATA_RATE_Current`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Great, thank you!
   
   Yes pendingRecords is present in all jobs and has a value (fluctuating, of 
course). I have never seen the operator's metric 
`flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be 
anything other than N/A, though. I'm observing it via prometheus, so maybe it's 
just a bug in the translation layer.
   
   edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`...
   
   (also, if this conversation is out of scope I'd be happy to move it to 
somewhere that is less tangential!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Great, thank you!
   
   Yes pendingRecords is present in all jobs and has a value (fluctuating, of 
course). I have never seen the operator's metric `jobVertexID_LAG_Current` be 
anything other than N/A, though. I'm observing it via prometheus, so maybe it's 
just a bug in the translation layer.
   
   (also, if this conversation is out of scope I'd be happy to move it to 
somewhere that is less tangential!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Great, thank you!
   
   Yes pendingRecords is present in all jobs and has a value (fluctuating, of 
course). I have never seen the operator's metric 
`flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be 
anything other than N/A, though. I'm observing it via prometheus, so maybe it's 
just a bug in the translation layer.
   
   (also, if this conversation is out of scope I'd be happy to move it to 
somewhere that is less tangential!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


gyfora commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585008279


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   lag time is defined as: number of outstanding records / current processing 
rate (so it's not based on timestamps)
   I would double check the pendingRecords metrics from the Kafka source 
because those should be there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Thanks, that makes a lot of sense! Is catch up status determined by literal 
timestamps compared against the catch up duration? eg if a record was placed in 
kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, 
or are we still 10m behind? or is catch up determined by throughput numbers? 
just trying to get a better sense of "catch up" statistics!
   
   Perhaps our problem is that lag and source_date_rate -for every single job 
tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`) -  is `N/A`. At 
least according to the exposed operator metrics themselves. If the operator 
can't see the lag then maybe it can't make an informed decision? I'm wondering 
if this is a bug on our configuration or maybe I'm just way off base. I should 
expect to see values for `LAG_Current` and `SOURCE_DATA_RATE_Current`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Thanks, that makes a lot of sense! Is catch up status determined by literal 
timestamps compared against the catch up duration? eg if a record was placed in 
kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, 
or are we still 10m behind? or is catch up determined by throughput numbers? 
just trying to get a better sense of "catch up" statistics!
   
   Perhaps our problem is that lag, for every single job tracked (operator 1.7, 
Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the 
exposed operator metrics themselves. If the operator can't see the lag then 
maybe it can't make an informed decision? I'm wondering if this is a bug on our 
configuration or maybe I'm just way off base. I should expect to see values for 
`LAG_Current`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Thanks, that makes a lot of sense! Is catchup data rate determined by 
literal timestamps compared against the catchup duration? eg if a record was 
placed in kafka 10m ago, and our expected catchup duration is 5m, then are we 
5m behind, or are we still 10m behind? or is catchup determined by throughput 
numbers? just trying to get a better sense of "catch up" statistics!
   
   Perhaps our problem is that lag, for every single job tracked (operator 1.7, 
Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the 
exposed operator metrics themselves. If the operator can't see the lag then 
maybe it can't make an informed decision? I'm wondering if this is a bug on our 
configuration or maybe I'm just way off base. I should expect to see values for 
`LAG_Current`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


trystanj commented on code in PR #586:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##
@@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 public static final ConfigOption TARGET_UTILIZATION_BOUNDARY =
 autoScalerConfig("target.utilization.boundary")
 .doubleType()
-.defaultValue(0.1)
+.defaultValue(0.4)

Review Comment:
   Thanks, that makes a lot of sense! Is catchup data rate determined by 
literal timestamps compared against the catchup duration? eg if a record was 
placed in kafka 10m ago, and our expected catchup duration is 5m, then are we 
5m behind, or are we still 10m behind? just trying to get a better sense of 
"catch up" statistics!
   
   Perhaps our problem is that lag, for every single job tracked (operator 1.7, 
Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the 
exposed operator metrics themselves. If the operator can't see the lag then 
maybe it can't make an informed decision? I'm wondering if this is a bug on our 
configuration or maybe I'm just way off base. I should expect to see values for 
`LAG_Current`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]

2024-04-30 Thread via GitHub


vinlee19 commented on PR #3286:
URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2085539643

   @ruanhang1993 @gong PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35278) Occasional NPE on k8s operator status replacement

2024-04-30 Thread Jira
Márton Balassi created FLINK-35278:
--

 Summary: Occasional NPE on k8s operator status replacement
 Key: FLINK-35278
 URL: https://issues.apache.org/jira/browse/FLINK-35278
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Márton Balassi
 Fix For: kubernetes-operator-1.9.0


Infrequently we get a null pointer exception on status replacement:
{noformat}
logger: io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher
message: Error during error status handling.   

throwable: { [-]
 class: java.lang.NullPointerException
 msg: null
 stack: [ [-]
   
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:136)
   
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
   
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
   
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:213)
   
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:60)
   
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
   
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
   
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
   
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
   
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   java.lang.Thread.run(Thread.java:829)
 ]
   }{noformat}
I suspect it probably is thrown by getResourceVersion() here:

[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java#L136]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35277:
---
Labels: pull-request-available  (was: )

> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -
>
> Key: FLINK-35277
> URL: https://issues.apache.org/jira/browse/FLINK-35277
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 3.1.0
> Environment: Flink 1.18.0 
> Flink CDC 3.1-pre
> DB2 11.5.x
>  
>Reporter: wenli xiao
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
> DB2 to Apache Doris, I set up DB2 using the Docker image 
> `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous 
> CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement, 
> or foreign key update caused by a DELETE statement are not valid because the 
> primary key, unique constraint or unique index identified by "2" constrains 
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>  
> 2. 
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a 
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
>   TARGET_SERVER     CHARACTER(18) not null,
>   TARGET_OWNER      VARCHAR(128)  not null,
>   TARGET_TABLE      VARCHAR(128)  not null,
>   SYNCHTIME         TIMESTAMP(6),
>   SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
>   SOURCE_OWNER      VARCHAR(128)  not null,
>   SOURCE_TABLE      VARCHAR(128)  not null,
>   SOURCE_VIEW_QUAL  SMALLINT      not null,
>   APPLY_QUAL        CHARACTER(18) not null,
>   SET_NAME          CHARACTER(18) not null,
>   CNTL_SERVER       CHARACTER(18) not null,
>   TARGET_STRUCTURE  SMALLINT      not null,
>   CNTL_ALIAS        CHARACTER(8),
>   PHYS_CHANGE_OWNER VARCHAR(128),
>   PHYS_CHANGE_TABLE VARCHAR(128),
>   MAP_ID            VARCHAR(10)   not null
> );
> ​
> create unique index IBMSNAP_PRUNCNTLX
>    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
>                         TARGET_TABLE, TARGET_OWNER);
> ​
> create unique index IBMSNAP_PRUNCNTLX1
>    on IBMSNAP_PRUNCNTL (MAP_ID);
> ​
> create index IBMSNAP_PRUNCNTLX2
>    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
> ​
> create index IBMSNAP_PRUNCNTLX3
>    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
>                'TARGET_SERVER, ' || 
>                'TARGET_OWNER, ' || 
>                'TARGET_TABLE, ' || 
>                'SYNCHTIME, ' || 
>                'SYNCHPOINT, ' || 
>                'SOURCE_OWNER, ' || 
>                'SOURCE_TABLE, ' || 
>                'SOURCE_VIEW_QUAL, ' || 
>                'APPLY_QUAL, ' || 
>                'SET_NAME, ' || 
>                'CNTL_SERVER , ' || 
>                'TARGET_STRUCTURE , ' || 
>                'CNTL_ALIAS , ' || 
>                'PHYS_CHANGE_OWNER , ' || 
>                'PHYS_CHANGE_TABLE , ' || 
>                'MAP_ID ' || 
>                ') VALUES ( ' || 
>                '''KAFKA'', ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                'NULL, ' || 
>                'NULL, ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                '0, ' || 
>                '''KAFKAQUAL'', ' || 
>                '''SET001'', ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '8, ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '''ASNCDC'', ' || 
>                '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
>                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
>                '   )';
> EXECUTE IMMEDIATE stmtSQL;
> The {{max(MAP_ID)}} 

[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread wenli xiao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wenli xiao updated FLINK-35277:
---
Description: 
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
`{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous CDC, 
I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 
foreign key update caused by a DELETE statement are not valid because the 
primary key, unique constraint or unique index identified by "2" constrains 
table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
!image-2024-04-30-22-19-17-350.png!

 

2. 

The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate 
primary key.

Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
create table IBMSNAP_PRUNCNTL
(
  TARGET_SERVER     CHARACTER(18) not null,
  TARGET_OWNER      VARCHAR(128)  not null,
  TARGET_TABLE      VARCHAR(128)  not null,
  SYNCHTIME         TIMESTAMP(6),
  SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
  SOURCE_OWNER      VARCHAR(128)  not null,
  SOURCE_TABLE      VARCHAR(128)  not null,
  SOURCE_VIEW_QUAL  SMALLINT      not null,
  APPLY_QUAL        CHARACTER(18) not null,
  SET_NAME          CHARACTER(18) not null,
  CNTL_SERVER       CHARACTER(18) not null,
  TARGET_STRUCTURE  SMALLINT      not null,
  CNTL_ALIAS        CHARACTER(8),
  PHYS_CHANGE_OWNER VARCHAR(128),
  PHYS_CHANGE_TABLE VARCHAR(128),
  MAP_ID            VARCHAR(10)   not null
);
​
create unique index IBMSNAP_PRUNCNTLX
   on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
APPLY_QUAL, SET_NAME, TARGET_SERVER,
                        TARGET_TABLE, TARGET_OWNER);
​
create unique index IBMSNAP_PRUNCNTLX1
   on IBMSNAP_PRUNCNTL (MAP_ID);
​
create index IBMSNAP_PRUNCNTLX2
   on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
​
create index IBMSNAP_PRUNCNTLX3
   on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
{{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
original insert statement is as follows:
– Original insert statement
SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
               'TARGET_SERVER, ' || 
               'TARGET_OWNER, ' || 
               'TARGET_TABLE, ' || 
               'SYNCHTIME, ' || 
               'SYNCHPOINT, ' || 
               'SOURCE_OWNER, ' || 
               'SOURCE_TABLE, ' || 
               'SOURCE_VIEW_QUAL, ' || 
               'APPLY_QUAL, ' || 
               'SET_NAME, ' || 
               'CNTL_SERVER , ' || 
               'TARGET_STRUCTURE , ' || 
               'CNTL_ALIAS , ' || 
               'PHYS_CHANGE_OWNER , ' || 
               'PHYS_CHANGE_TABLE , ' || 
               'MAP_ID ' || 
               ') VALUES ( ' || 
               '''KAFKA'', ' || 
                || tableschema || ''', ' || 
                || tablename || ''', ' ||
               'NULL, ' || 
               'NULL, ' || 
                || tableschema || ''', ' || 
                || tablename || ''', ' ||
               '0, ' || 
               '''KAFKAQUAL'', ' || 
               '''SET001'', ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '8, ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '''ASNCDC'', ' || 
               '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
               ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
               '   )';
EXECUTE IMMEDIATE stmtSQL;
The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
{{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents 
the addition of the eleventh table. For more details about 
{{{}asncdcaddremove.sql{}}}, please refer to: 
[asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]

  was:
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE 

[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread wenli xiao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wenli xiao updated FLINK-35277:
---
Environment: 
Flink 1.18.0 

Flink CDC 3.1-pre

DB2 11.5.x

 

  was:
Flink 1.18.0 

DB2 11.5.x

 


> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -
>
> Key: FLINK-35277
> URL: https://issues.apache.org/jira/browse/FLINK-35277
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 3.1.0
> Environment: Flink 1.18.0 
> Flink CDC 3.1-pre
> DB2 11.5.x
>  
>Reporter: wenli xiao
>Priority: Minor
> Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
> DB2 to Apache Doris, I set up DB2 using the Docker image 
> {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, 
> I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement, 
> or foreign key update caused by a DELETE statement are not valid because the 
> primary key, unique constraint or unique index identified by "2" constrains 
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>  
> 2. 
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a 
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
>   TARGET_SERVER     CHARACTER(18) not null,
>   TARGET_OWNER      VARCHAR(128)  not null,
>   TARGET_TABLE      VARCHAR(128)  not null,
>   SYNCHTIME         TIMESTAMP(6),
>   SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
>   SOURCE_OWNER      VARCHAR(128)  not null,
>   SOURCE_TABLE      VARCHAR(128)  not null,
>   SOURCE_VIEW_QUAL  SMALLINT      not null,
>   APPLY_QUAL        CHARACTER(18) not null,
>   SET_NAME          CHARACTER(18) not null,
>   CNTL_SERVER       CHARACTER(18) not null,
>   TARGET_STRUCTURE  SMALLINT      not null,
>   CNTL_ALIAS        CHARACTER(8),
>   PHYS_CHANGE_OWNER VARCHAR(128),
>   PHYS_CHANGE_TABLE VARCHAR(128),
>   MAP_ID            VARCHAR(10)   not null
> );
> ​
> create unique index IBMSNAP_PRUNCNTLX
>    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
>                         TARGET_TABLE, TARGET_OWNER);
> ​
> create unique index IBMSNAP_PRUNCNTLX1
>    on IBMSNAP_PRUNCNTL (MAP_ID);
> ​
> create index IBMSNAP_PRUNCNTLX2
>    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
> ​
> create index IBMSNAP_PRUNCNTLX3
>    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
>                'TARGET_SERVER, ' || 
>                'TARGET_OWNER, ' || 
>                'TARGET_TABLE, ' || 
>                'SYNCHTIME, ' || 
>                'SYNCHPOINT, ' || 
>                'SOURCE_OWNER, ' || 
>                'SOURCE_TABLE, ' || 
>                'SOURCE_VIEW_QUAL, ' || 
>                'APPLY_QUAL, ' || 
>                'SET_NAME, ' || 
>                'CNTL_SERVER , ' || 
>                'TARGET_STRUCTURE , ' || 
>                'CNTL_ALIAS , ' || 
>                'PHYS_CHANGE_OWNER , ' || 
>                'PHYS_CHANGE_TABLE , ' || 
>                'MAP_ID ' || 
>                ') VALUES ( ' || 
>                '''KAFKA'', ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                'NULL, ' || 
>                'NULL, ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                '0, ' || 
>                '''KAFKAQUAL'', ' || 
>                '''SET001'', ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '8, ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '''ASNCDC'', ' || 
>                '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
>                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
>                '   )';
> EXECUTE IMMEDIATE stmtSQL;
> The 

[jira] [Commented] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread wenli xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842404#comment-17842404
 ] 

wenli xiao commented on FLINK-35277:


asncdcaddremove.sql: 
https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189

> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -
>
> Key: FLINK-35277
> URL: https://issues.apache.org/jira/browse/FLINK-35277
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 3.1.0
> Environment: Flink 1.18.0 
> DB2 11.5.x
>  
>Reporter: wenli xiao
>Priority: Minor
> Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
> DB2 to Apache Doris, I set up DB2 using the Docker image 
> {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, 
> I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement, 
> or foreign key update caused by a DELETE statement are not valid because the 
> primary key, unique constraint or unique index identified by "2" constrains 
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>  
> 2. 
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a 
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
>   TARGET_SERVER     CHARACTER(18) not null,
>   TARGET_OWNER      VARCHAR(128)  not null,
>   TARGET_TABLE      VARCHAR(128)  not null,
>   SYNCHTIME         TIMESTAMP(6),
>   SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
>   SOURCE_OWNER      VARCHAR(128)  not null,
>   SOURCE_TABLE      VARCHAR(128)  not null,
>   SOURCE_VIEW_QUAL  SMALLINT      not null,
>   APPLY_QUAL        CHARACTER(18) not null,
>   SET_NAME          CHARACTER(18) not null,
>   CNTL_SERVER       CHARACTER(18) not null,
>   TARGET_STRUCTURE  SMALLINT      not null,
>   CNTL_ALIAS        CHARACTER(8),
>   PHYS_CHANGE_OWNER VARCHAR(128),
>   PHYS_CHANGE_TABLE VARCHAR(128),
>   MAP_ID            VARCHAR(10)   not null
> );
> ​
> create unique index IBMSNAP_PRUNCNTLX
>    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
>                         TARGET_TABLE, TARGET_OWNER);
> ​
> create unique index IBMSNAP_PRUNCNTLX1
>    on IBMSNAP_PRUNCNTL (MAP_ID);
> ​
> create index IBMSNAP_PRUNCNTLX2
>    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
> ​
> create index IBMSNAP_PRUNCNTLX3
>    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
>                'TARGET_SERVER, ' || 
>                'TARGET_OWNER, ' || 
>                'TARGET_TABLE, ' || 
>                'SYNCHTIME, ' || 
>                'SYNCHPOINT, ' || 
>                'SOURCE_OWNER, ' || 
>                'SOURCE_TABLE, ' || 
>                'SOURCE_VIEW_QUAL, ' || 
>                'APPLY_QUAL, ' || 
>                'SET_NAME, ' || 
>                'CNTL_SERVER , ' || 
>                'TARGET_STRUCTURE , ' || 
>                'CNTL_ALIAS , ' || 
>                'PHYS_CHANGE_OWNER , ' || 
>                'PHYS_CHANGE_TABLE , ' || 
>                'MAP_ID ' || 
>                ') VALUES ( ' || 
>                '''KAFKA'', ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                'NULL, ' || 
>                'NULL, ' || 
>                 || tableschema || ''', ' || 
>                 || tablename || ''', ' ||
>                '0, ' || 
>                '''KAFKAQUAL'', ' || 
>                '''SET001'', ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '8, ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '''ASNCDC'', ' || 
>                '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
>                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
>               

[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread wenli xiao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wenli xiao updated FLINK-35277:
---
Description: 
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 
foreign key update caused by a DELETE statement are not valid because the 
primary key, unique constraint or unique index identified by "2" constrains 
table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
!image-2024-04-30-22-19-17-350.png!

 

2. 

The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate 
primary key.

Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
create table IBMSNAP_PRUNCNTL
(
  TARGET_SERVER     CHARACTER(18) not null,
  TARGET_OWNER      VARCHAR(128)  not null,
  TARGET_TABLE      VARCHAR(128)  not null,
  SYNCHTIME         TIMESTAMP(6),
  SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
  SOURCE_OWNER      VARCHAR(128)  not null,
  SOURCE_TABLE      VARCHAR(128)  not null,
  SOURCE_VIEW_QUAL  SMALLINT      not null,
  APPLY_QUAL        CHARACTER(18) not null,
  SET_NAME          CHARACTER(18) not null,
  CNTL_SERVER       CHARACTER(18) not null,
  TARGET_STRUCTURE  SMALLINT      not null,
  CNTL_ALIAS        CHARACTER(8),
  PHYS_CHANGE_OWNER VARCHAR(128),
  PHYS_CHANGE_TABLE VARCHAR(128),
  MAP_ID            VARCHAR(10)   not null
);
​
create unique index IBMSNAP_PRUNCNTLX
   on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
APPLY_QUAL, SET_NAME, TARGET_SERVER,
                        TARGET_TABLE, TARGET_OWNER);
​
create unique index IBMSNAP_PRUNCNTLX1
   on IBMSNAP_PRUNCNTL (MAP_ID);
​
create index IBMSNAP_PRUNCNTLX2
   on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
​
create index IBMSNAP_PRUNCNTLX3
   on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
{{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
original insert statement is as follows:
– Original insert statement
SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
               'TARGET_SERVER, ' || 
               'TARGET_OWNER, ' || 
               'TARGET_TABLE, ' || 
               'SYNCHTIME, ' || 
               'SYNCHPOINT, ' || 
               'SOURCE_OWNER, ' || 
               'SOURCE_TABLE, ' || 
               'SOURCE_VIEW_QUAL, ' || 
               'APPLY_QUAL, ' || 
               'SET_NAME, ' || 
               'CNTL_SERVER , ' || 
               'TARGET_STRUCTURE , ' || 
               'CNTL_ALIAS , ' || 
               'PHYS_CHANGE_OWNER , ' || 
               'PHYS_CHANGE_TABLE , ' || 
               'MAP_ID ' || 
               ') VALUES ( ' || 
               '''KAFKA'', ' || 
                || tableschema || ''', ' || 
                || tablename || ''', ' ||
               'NULL, ' || 
               'NULL, ' || 
                || tableschema || ''', ' || 
                || tablename || ''', ' ||
               '0, ' || 
               '''KAFKAQUAL'', ' || 
               '''SET001'', ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '8, ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '''ASNCDC'', ' || 
               '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
               ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
               '   )';
EXECUTE IMMEDIATE stmtSQL;
The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
{{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents 
the addition of the eleventh table. For more details about 
{{{}asncdcaddremove.sql{}}}, please refer to: 
[asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]

  was:
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. 
However, I encountered an error when attempting to add the eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 

[jira] [Created] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.

2024-04-30 Thread wenli xiao (Jira)
wenli xiao created FLINK-35277:
--

 Summary: Fix the error in the `asncdcaddremove.sql` script for the 
DB2 test container.
 Key: FLINK-35277
 URL: https://issues.apache.org/jira/browse/FLINK-35277
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: 3.1.0
 Environment: Flink 1.18.0 

DB2 11.5.x

 
Reporter: wenli xiao
 Attachments: image-2024-04-30-22-19-17-350.png

1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. 
However, I encountered an error when attempting to add the eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 
foreign key update caused by a DELETE statement are not valid because the 
primary key, unique constraint or unique index identified by "2" constrains 
table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
!image-2024-04-30-22-19-17-350.png!

 

2. 

The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate 
primary key.

Here is the schema of {{Asncdc.IBMSNAP_PRUNCNTL}}:
create table IBMSNAP_PRUNCNTL
(
    TARGET_SERVER     CHARACTER(18) not null,
    TARGET_OWNER      VARCHAR(128)  not null,
    TARGET_TABLE      VARCHAR(128)  not null,
    SYNCHTIME         TIMESTAMP(6),
    SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
    SOURCE_OWNER      VARCHAR(128)  not null,
    SOURCE_TABLE      VARCHAR(128)  not null,
    SOURCE_VIEW_QUAL  SMALLINT      not null,
    APPLY_QUAL        CHARACTER(18) not null,
    SET_NAME          CHARACTER(18) not null,
    CNTL_SERVER       CHARACTER(18) not null,
    TARGET_STRUCTURE  SMALLINT      not null,
    CNTL_ALIAS        CHARACTER(8),
    PHYS_CHANGE_OWNER VARCHAR(128),
    PHYS_CHANGE_TABLE VARCHAR(128),
    MAP_ID            VARCHAR(10)   not null
);
​
create unique index IBMSNAP_PRUNCNTLX
    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
APPLY_QUAL, SET_NAME, TARGET_SERVER,
                         TARGET_TABLE, TARGET_OWNER);
​
create unique index IBMSNAP_PRUNCNTLX1
    on IBMSNAP_PRUNCNTL (MAP_ID);
​
create index IBMSNAP_PRUNCNTLX2
    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
​
create index IBMSNAP_PRUNCNTLX3
    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
{{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
original insert statement is as follows:
-- Original insert statement
SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
                'TARGET_SERVER,  ' || 
                'TARGET_OWNER,  ' || 
                'TARGET_TABLE,  ' || 
                'SYNCHTIME,  ' || 
                'SYNCHPOINT,  ' || 
                'SOURCE_OWNER,  ' || 
                'SOURCE_TABLE,  ' || 
                'SOURCE_VIEW_QUAL,  ' || 
                'APPLY_QUAL,  ' || 
                'SET_NAME,  ' || 
                'CNTL_SERVER ,  ' || 
                'TARGET_STRUCTURE ,  ' || 
                'CNTL_ALIAS ,  ' || 
                'PHYS_CHANGE_OWNER ,  ' || 
                'PHYS_CHANGE_TABLE ,  ' || 
                'MAP_ID  ' || 
                ') VALUES ( ' || 
                '''KAFKA'', ' || 
                 || tableschema || ''', ' || 
                 || tablename || ''', ' ||
                'NULL, ' || 
                'NULL, ' || 
                 || tableschema || ''', ' || 
                 || tablename || ''', ' ||
                '0, ' || 
                '''KAFKAQUAL'', ' || 
                '''SET001'', ' || 
                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
                '8, ' || 
                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
                '''ASNCDC'', ' || 
                '''CDC_' ||  tableschema ||  '_' || tablename || ''', ' ||
                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10))  
END AS MYINT from  ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
                '    )';
EXECUTE IMMEDIATE stmtSQL;
The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
{{CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10))}}. This issue prevents the 
addition of the eleventh table. For more details about {{asncdcaddremove.sql}}, 
please refer to: 
[asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter

2024-04-30 Thread PengFei Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842395#comment-17842395
 ] 

PengFei Li commented on FLINK-35275:


[~twalthr] Could you help confirm if this is a bug? If so, I'd like to fix it.

> ArrayContainsFunction uses wrong DataType to create element getter
> --
>
> Key: FLINK-35275
> URL: https://issues.apache.org/jira/browse/FLINK-35275
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: PengFei Li
>Priority: Minor
>
>  
> In ArrayContainsFunction, elementGetter is used to get elements of an array, 
> but it's created from the needle data type rather than the element data type 
> which will lead to wrong results.
> {code:java}
> public ArrayContainsFunction(SpecializedContext context) {
> super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context);
> final DataType needleDataType = 
> context.getCallContext().getArgumentDataTypes().get(1);
> elementGetter = 
> ArrayData.createElementGetter(needleDataType.getLogicalType());
> 
> } {code}
> For example, the following sql returns true, but the expected is false. The 
> element type is nullable int, and the needle type is non-nullable int. Using 
> the needle type to create element getter will convert the NULL element to 0, 
> so the result returns true.
> {code:java}
> SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35276) SortCodeGeneratorTest.testMultiKeys fails on negative zero

2024-04-30 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-35276:

Attachment: job-logs.txt

> SortCodeGeneratorTest.testMultiKeys fails on negative zero
> --
>
> Key: FLINK-35276
> URL: https://issues.apache.org/jira/browse/FLINK-35276
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
> Attachments: job-logs.txt
>
>
> 1.19 AdaptiveScheduler / Test (module: table) 
> [https://github.com/apache/flink/actions/runs/8864296211/job/24339523745#step:10:10757]
> SortCodeGeneratorTest can fail if one of the generated random row values is 
> -0.0f.
> {code:java}
> Apr 28 02:38:03 expect: +I(,SqlRawValue{?},0.0,false); actual: 
> +I(,SqlRawValue{?},-0.0,false)
> Apr 28 02:38:03 expect: +I(,SqlRawValue{?},-0.0,false); actual: 
> +I(,SqlRawValue{?},0.0,false)
> ...
> 
> ...
> Apr 28 02:38:04 expect: +I(,null,4.9695407E17,false); actual: 
> +I(,null,4.9695407E17,false)
> Apr 28 02:38:04 expect: +I(,null,-3.84924672E18,false); actual: 
> +I(,null,-3.84924672E18,false)
> Apr 28 02:38:04 types: [[RAW('java.lang.Integer', ?), FLOAT, BOOLEAN]]
> Apr 28 02:38:04 keys: [0, 1]] 
> Apr 28 02:38:04 expected: 0.0f
> Apr 28 02:38:04  but was: -0.0f
> Apr 28 02:38:04   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 28 02:38:04   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 28 02:38:04   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Apr 28 02:38:04   at 
> org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testInner(SortCodeGeneratorTest.java:632)
> Apr 28 02:38:04   at 
> org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testMultiKeys(SortCodeGeneratorTest.java:143)
> Apr 28 02:38:04   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> In the test code, this is extremely unlikely to occur (one in 2²⁴?) but *has* 
> happened at this line (when the {{rnd.nextFloat()}} is {{0.0f}} and 
> {{rnd.nextLong()}} is negative:
> [https://github.com/apache/flink/blob/e7ce0a2969633168b9395c683921aa49362ad7a4/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java#L255]
> We can reproduce the failure by changing how likely {{0.0f}} is to be 
> generated at that line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35276) SortCodeGeneratorTest.testMultiKeys fails on negative zero

2024-04-30 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35276:
---

 Summary: SortCodeGeneratorTest.testMultiKeys fails on negative zero
 Key: FLINK-35276
 URL: https://issues.apache.org/jira/browse/FLINK-35276
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.20.0, 1.19.1
Reporter: Ryan Skraba


1.19 AdaptiveScheduler / Test (module: table) 
[https://github.com/apache/flink/actions/runs/8864296211/job/24339523745#step:10:10757]

SortCodeGeneratorTest can fail if one of the generated random row values is 
-0.0f.
{code:java}
Apr 28 02:38:03 expect: +I(,SqlRawValue{?},0.0,false); actual: 
+I(,SqlRawValue{?},-0.0,false)
Apr 28 02:38:03 expect: +I(,SqlRawValue{?},-0.0,false); actual: 
+I(,SqlRawValue{?},0.0,false)
...

...
Apr 28 02:38:04 expect: +I(,null,4.9695407E17,false); actual: 
+I(,null,4.9695407E17,false)
Apr 28 02:38:04 expect: +I(,null,-3.84924672E18,false); actual: 
+I(,null,-3.84924672E18,false)
Apr 28 02:38:04 types: [[RAW('java.lang.Integer', ?), FLOAT, BOOLEAN]]
Apr 28 02:38:04 keys: [0, 1]] 
Apr 28 02:38:04 expected: 0.0f
Apr 28 02:38:04  but was: -0.0f
Apr 28 02:38:04 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Apr 28 02:38:04 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Apr 28 02:38:04 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Apr 28 02:38:04 at 
org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testInner(SortCodeGeneratorTest.java:632)
Apr 28 02:38:04 at 
org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testMultiKeys(SortCodeGeneratorTest.java:143)
Apr 28 02:38:04 at java.lang.reflect.Method.invoke(Method.java:498)
{code}

In the test code, this is extremely unlikely to occur (one in 2²⁴?) but *has* 
happened at this line (when the {{rnd.nextFloat()}} is {{0.0f}} and 
{{rnd.nextLong()}} is negative:

[https://github.com/apache/flink/blob/e7ce0a2969633168b9395c683921aa49362ad7a4/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java#L255]

We can reproduce the failure by changing how likely {{0.0f}} is to be generated 
at that line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-27146] [Filesystem] Migrate to Junit5 [flink]

2024-04-30 Thread via GitHub


kottmann commented on PR #22789:
URL: https://github.com/apache/flink/pull/22789#issuecomment-2085392466

   @ferenc-csaky Thank you for reviewing my PR and for your helpful feedback! I 
really appreciate your support in improving it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter

2024-04-30 Thread PengFei Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PengFei Li updated FLINK-35275:
---
Description: 
 

In ArrayContainsFunction, elementGetter is used to get elements of an array, 
but it's created from the needle data type rather than the element data type 
which will lead to wrong results.
{code:java}
public ArrayContainsFunction(SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context);
final DataType needleDataType = 
context.getCallContext().getArgumentDataTypes().get(1);
elementGetter = 
ArrayData.createElementGetter(needleDataType.getLogicalType());

} {code}
For example, the following sql returns true, but the expected is false. The 
element type is nullable int, and the needle type is non-nullable int. Using 
the needle type to create element getter will convert the NULL element to 0, so 
the result returns true.
{code:java}
SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code}
 

 

  was:
 

In `ArrayContainsFunction`, `elementGetter` is used to get elements of an 
array, but it's created from the needle data type rather than the element data 
type which will lead to wrong results.
{code:java}
public ArrayContainsFunction(SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context);
final DataType needleDataType = 
context.getCallContext().getArgumentDataTypes().get(1);
elementGetter = 
ArrayData.createElementGetter(needleDataType.getLogicalType());

} {code}
For example, the following sql returns true, but the expected is false. The 
element type is nullable int, and the needle type is non-nullable int. Using 
the needle type to create element getter will convert the NULL element to 0, so 
the result returns true.
{code:java}
SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code}
 

 


> ArrayContainsFunction uses wrong DataType to create element getter
> --
>
> Key: FLINK-35275
> URL: https://issues.apache.org/jira/browse/FLINK-35275
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: PengFei Li
>Priority: Minor
>
>  
> In ArrayContainsFunction, elementGetter is used to get elements of an array, 
> but it's created from the needle data type rather than the element data type 
> which will lead to wrong results.
> {code:java}
> public ArrayContainsFunction(SpecializedContext context) {
> super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context);
> final DataType needleDataType = 
> context.getCallContext().getArgumentDataTypes().get(1);
> elementGetter = 
> ArrayData.createElementGetter(needleDataType.getLogicalType());
> 
> } {code}
> For example, the following sql returns true, but the expected is false. The 
> element type is nullable int, and the needle type is non-nullable int. Using 
> the needle type to create element getter will convert the NULL element to 0, 
> so the result returns true.
> {code:java}
> SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter

2024-04-30 Thread PengFei Li (Jira)
PengFei Li created FLINK-35275:
--

 Summary: ArrayContainsFunction uses wrong DataType to create 
element getter
 Key: FLINK-35275
 URL: https://issues.apache.org/jira/browse/FLINK-35275
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.16.0
Reporter: PengFei Li


 

In `ArrayContainsFunction`, `elementGetter` is used to get elements of an 
array, but it's created from the needle data type rather than the element data 
type which will lead to wrong results.
{code:java}
public ArrayContainsFunction(SpecializedContext context) {
super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context);
final DataType needleDataType = 
context.getCallContext().getArgumentDataTypes().get(1);
elementGetter = 
ArrayData.createElementGetter(needleDataType.getLogicalType());

} {code}
For example, the following sql returns true, but the expected is false. The 
element type is nullable int, and the needle type is non-nullable int. Using 
the needle type to create element getter will convert the NULL element to 0, so 
the result returns true.
{code:java}
SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


mbalassi commented on code in PR #819:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584812594


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java:
##
@@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) {
 })
 .add(deploymentName);
 
+// Minor version computed from the above
+var subVersions = flinkVersion.split("\\.");
+String minorVersion = MALFORMED_MINOR_VERSION;

Review Comment:
   Fair, I am open to simply using `UNKNOWN`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]

2024-04-30 Thread via GitHub


flinkbot commented on PR #24752:
URL: https://github.com/apache/flink/pull/24752#issuecomment-2085320637

   
   ## CI report:
   
   * 0670f3aec8b7aaa6db1192a8cdead22d9c6b925b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]

2024-04-30 Thread via GitHub


flinkbot commented on PR #24751:
URL: https://github.com/apache/flink/pull/24751#issuecomment-2085320237

   
   ## CI report:
   
   * e5d3c1296ac763d40175720c8fff58716ab1a3b1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


mbalassi commented on code in PR #819:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584810870


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java:
##
@@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) {
 private void initFlinkVersions(String ns, String flinkVersion) {
 parentMetricGroup
 .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
-.addGroup(FLINK_VERSION_GROUP_NAME)
-.addGroup(flinkVersion)
+.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion)

Review Comment:
   Good question. This is actually semantically different. We can consider a 
previous behavior a bug, this approach works well with modern metrics systems 
like Prometheus/Graphana.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java:
##
@@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) {
 private void initFlinkVersions(String ns, String flinkVersion) {
 parentMetricGroup
 .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
-.addGroup(FLINK_VERSION_GROUP_NAME)
-.addGroup(flinkVersion)
+.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion)

Review Comment:
   Good question. This is actually semantically different. We can consider the 
previous behavior a bug, this approach works well with modern metrics systems 
like Prometheus/Graphana.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]

2024-04-30 Thread via GitHub


StefanRRichter opened a new pull request, #24751:
URL: https://github.com/apache/flink/pull/24751

   Backport of FLINK-35217 for 1.19
   
   (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


gaborgsomogyi commented on code in PR #819:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584790463


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java:
##
@@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) {
 private void initFlinkVersions(String ns, String flinkVersion) {
 parentMetricGroup
 .createResourceNamespaceGroup(configuration, 
FlinkDeployment.class, ns)
-.addGroup(FLINK_VERSION_GROUP_NAME)
-.addGroup(flinkVersion)
+.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion)

Review Comment:
   This is not related, just merging the 2 statements to one, right?



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java:
##
@@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) {
 })
 .add(deploymentName);
 
+// Minor version computed from the above
+var subVersions = flinkVersion.split("\\.");
+String minorVersion = MALFORMED_MINOR_VERSION;

Review Comment:
   Not sure why not use `UNKNOWN` just like the other case but I'm fine with 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [mysql] Mysql-cdc adapt mariadb. [flink-cdc]

2024-04-30 Thread via GitHub


ThisisWilli commented on PR #2494:
URL: https://github.com/apache/flink-cdc/pull/2494#issuecomment-2085275760

   > Hi @ThisisWilli, thanks for your great contribution! Could you please 
rebase this PR with latest `master` branch? You may need to rename some 
packages like from `com.ververica.cdc` to `org.apache.flink.cdc`.
   > 
   > cc @leonardBang
   
   @yuxiqian I'll find time to finish this work in mid-May.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842366#comment-17842366
 ] 

chenyuzhi commented on FLINK-35192:
---

It's probably not that the images aren't broken, but that they're too big and 
I've re-scaled them to view them
[~bgeng777] 

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, 
> image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, 
> image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi edited comment on FLINK-35192 at 4/30/24 12:44 PM:
-

 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):

!image-2024-04-30-20-39-34-396.png|width=611,height=343!


!image-2024-04-30-20-39-05-109.png|width=678,height=397!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=682,height=148!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

 

!image-2024-04-30-20-43-20-125.png|width=632,height=250!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

 

ENV

glibc-version: 2.31

jemalloc-version: 4.5


was (Author: stupid_pig):
 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in 
hadoop, it is suspected that glibc has a memory leak due to memory 
fragmentation in multiple arenas under multi-threading. At present, we refer to 
the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

 

ENV

glibc-version: 2.31

jemalloc-version: 4.5

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: 

[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842363#comment-17842363
 ] 

Roman Khachatryan commented on FLINK-35217:
---

[~srichter] would you mind backporting the fix to previous releases?

It should at least be ported to one previous release according to support 
policy, ideally to two.

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> 

Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


davidradl commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584710588


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
+ * Snowflake.
+ */
+@Internal
+public class SnowflakeRowConverter extends 
AbstractPostgresCompatibleRowConverter {
+
+private static final long serialVersionUID = 1L;
+
+@Override
+public String converterName() {
+return "Snowflake";
+}
+
+// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
+public SnowflakeRowConverter(RowType rowType) {

Review Comment:
   I am not sure exactly how Snowflake SQL compares to the other SQLS. It seems 
to be close to SQL server , which has some TINYINT logic in its converter - it 
looks like Snowflake supports this type - so you need to add this to the 
converter.
   
   I also see 
[https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server](https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server).
 Do any of these SQL quirks require converter changes? 
   
   I notice that Snowflake SQL uses double quotes, where as Flink SQL uses back 
ticks , it would be worth testing that quotes work for Snowflake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


davidradl commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link SnowflakeDialect}. */
+@Internal
+public class SnowflakeDialectFactory implements JdbcDialectFactory {
+@Override
+public boolean acceptsURL(String url) {
+return url.startsWith("jdbc:snowflake:");
+}
+
+@Override
+public JdbcDialect create() {

Review Comment:
   Please could you add some junits and docs for this pr please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


davidradl commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584710588


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for
+ * Snowflake.
+ */
+@Internal
+public class SnowflakeRowConverter extends 
AbstractPostgresCompatibleRowConverter {
+
+private static final long serialVersionUID = 1L;
+
+@Override
+public String converterName() {
+return "Snowflake";
+}
+
+// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
+public SnowflakeRowConverter(RowType rowType) {

Review Comment:
   I am not sure exactly how Snowflake SQL compares to the other SQLS. It seems 
to be close to SQL server , which has some TINYINT logic in its converter - it 
looks like Snowflake supports this.
   
   I also see 
[https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server](https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server).
 Do any of these SQL quirks require converter changes? 
   
   I notice that Snowflake SQL uses double quotes, where as Flink SQL uses back 
ticks , it would be worth testing that quotes work for Snowflake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


davidradl commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link SnowflakeDialect}. */
+@Internal
+public class SnowflakeDialectFactory implements JdbcDialectFactory {
+@Override
+public boolean acceptsURL(String url) {
+return url.startsWith("jdbc:snowflake:");
+}
+
+@Override
+public JdbcDialect create() {

Review Comment:
   Please could you add some junits and docs for this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


davidradl commented on code in PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.snowflake.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link SnowflakeDialect}. */
+@Internal
+public class SnowflakeDialectFactory implements JdbcDialectFactory {
+@Override
+public boolean acceptsURL(String url) {
+return url.startsWith("jdbc:snowflake:");
+}
+
+@Override
+public JdbcDialect create() {

Review Comment:
   Please could you add some junits for this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-35217.
--
Resolution: Fixed

Merged to master in 
[{{80af4d5}}|https://github.com/apache/flink/commit/80af4d502318348ba15a8f75a2a622ce9dbdc968]
  

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 

Re: [PR] [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. [flink]

2024-04-30 Thread via GitHub


StefanRRichter merged PR #24722:
URL: https://github.com/apache/flink/pull/24722


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage

2024-04-30 Thread Stefan Richter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-35217:
---
Fix Version/s: 1.20.0

> Missing fsync in FileSystemCheckpointStorage
> 
>
> Key: FLINK-35217
> URL: https://issues.apache.org/jira/browse/FLINK-35217
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Marc Aurel Fritz
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> While running Flink on a system with unstable power supply checkpoints were 
> regularly corrupted in the form of "_metadata" files with a file size of 0 
> bytes. In all cases the previous checkpoint data had already been deleted, 
> causing progress to be lost completely.
> Further investigation revealed that the "FileSystemCheckpointStorage" doesn't 
> perform "fsync" when writing a new checkpoint to disk. This means the old 
> checkpoint gets removed without making sure that the new one is durably 
> persisted on disk. "strace" on the jobmanager's process confirms this 
> behavior:
>  # The checkpoint chk-60's in-progress metadata is written at "openat"
>  # The checkpoint chk-60's in-progress metadata is atomically renamed at 
> "rename"
>  # The old checkpoint chk-59 is deleted at "unlink"
> For durable persistence an "fsync" call is missing before step 3.
> Full "strace" log:
> {code:java}
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", 
> {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0
> [pid 51618] 11:44:30 
> mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) 
> = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc860) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata",
>  0x7fd2ad5fc740) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory)
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51618] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 51618] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51618] 11:44:30 
> rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8",
>  "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata",
>  {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0
> [pid 51644] 11:44:30 
> unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata")
>  = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 51644] 11:44:30 openat(AT_FDCWD, 
> "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 51644] 11:44:30 
> 

Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]

2024-04-30 Thread via GitHub


gong commented on PR #3284:
URL: https://github.com/apache/flink-cdc/pull/3284#issuecomment-2085022086

   @PatrickRen PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]

2024-04-30 Thread via GitHub


gong commented on PR #3283:
URL: https://github.com/apache/flink-cdc/pull/3283#issuecomment-2085021893

   @PatrickRen PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35176][Connector/JDBC] Support property authentication connection for JDBC catalog & dynamic table [flink-connector-jdbc]

2024-04-30 Thread via GitHub


eskabetxe commented on code in PR #116:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/116#discussion_r1584601921


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##
@@ -88,32 +92,49 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog {
 private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
 
 protected final ClassLoader userClassLoader;
-protected final String username;
-protected final String pwd;
 protected final String baseUrl;
 protected final String defaultUrl;
+protected final Properties connectionProperties;
 
+@Deprecated
 public AbstractJdbcCatalog(
 ClassLoader userClassLoader,
 String catalogName,
 String defaultDatabase,
 String username,
 String pwd,
 String baseUrl) {
+this(
+userClassLoader,
+catalogName,
+defaultDatabase,
+baseUrl,
+getBriefAuthProperties(username, pwd));
+}
+
+public AbstractJdbcCatalog(
+ClassLoader userClassLoader,
+String catalogName,
+String defaultDatabase,
+String baseUrl,
+Properties connectionProperties) {
 super(catalogName, defaultDatabase);
 
 checkNotNull(userClassLoader);
-checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
-checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
 checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
 
 JdbcCatalogUtils.validateJdbcUrl(baseUrl);
 
 this.userClassLoader = userClassLoader;
-this.username = username;
-this.pwd = pwd;
 this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
 this.defaultUrl = this.baseUrl + defaultDatabase;
+this.connectionProperties = 
Preconditions.checkNotNull(connectionProperties);
+if 
(StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)))
 {

Review Comment:
   we should maintain the current checks
   ```
   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY)));
   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(PASSWORD_KEY)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35272) Pipeline Transform job supports omitting / renaming calculation column

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35272:
---
Labels: pull-request-available  (was: )

> Pipeline Transform job supports omitting / renaming calculation column
> --
>
> Key: FLINK-35272
> URL: https://issues.apache.org/jira/browse/FLINK-35272
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, pipeline transform rules require all columns used by expression 
> calculation must be present in final projected schema, and shall not be 
> renamed or omitted.
>  
> The reason behind this is any column not directly present in projection rules 
> will be filtered out in the PreProjection step, and then the PostProjection 
> process could not find those non-present but indirectly depended columns.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-04-30 Thread via GitHub


yuxiqian opened a new pull request, #3285:
URL: https://github.com/apache/flink-cdc/pull/3285

   This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272).
   
   Currently, pipeline jobs with transform (including projection and filtering) 
are constructed with the following topology:
   
   ```
   SchemaTransformOp --> DataTransformOp --> SchemaOp
   ```
   
   where schema projections are applied in `SchemaTransformOp` and data 
projection & filtering are applied in `DataTransformOp`. The idea is 
`SchemaTransformOp` might be embedded in `Sources` in the future to reduce 
payload data size transferred in Flink Job.
   
   However, current implementation has a known defect that omits unused columns 
too early, causing some downstream-relied columns got removed after they 
arrived in `DataTransformOp`. See a example as follows:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, upper(name) as newname
   filter: age > 18
   ```
   
   Such transformation rules will fail since `name` and `age` columns are 
removed in `SchemaTransformOp`, and those data rows could not be retrieved in 
`DataTransformOp`, where the actual expression evaluation and filtering comes 
into effect.
   
   This PR introduces a new design, renaming the transform topology as follows:
   
   ```
   PreTransformOp --> PostTransformOp --> SchemaOp
   ```
   
   where the `PreTransformOp` filters out columns, but only if:
   * The column is not present in projection rules
   * The column is not indirectly referenced by calculation and filtering 
expressions
   
   If a column is explicity written down, it will be passed to downstream 
as-is. But for referenced columns, a special prefix will be added to their 
names. In the example above, a schema like `[id, newname, __PREFIX__name, 
__PREFIX__age]` will be generated to downstream. Notice that the expression 
evaluation and filtering will not come into effect for now, so a 
`DataChangeEvent` would be like `[1, null, 'Alice', 19]`.
   
   Adding prefix is meant to deal with such cases:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, upper(name) as name
   filter: age > 18
   ```
   
   Here we need to distinguish the calculated column `(new) name` and the 
referenced original column `(old) name`. So after the name mangling process the 
schema would be like: `[id, name, __PREFIX__name]`.
   
   Also, the filtering process is still done in `PostTransformOp` since user 
could write down a  filter expression that references calculated column, but 
their value won't be available until `PostTransformOp`'s evaluation. It also 
means in the following somewhat ambigious case:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
 - source-table: employee
   projection: id, age * 2 as age
   filter: age > 18
   ```
   
   The filtering expression is applied to the calculated `age` column 
(doubled!) instead of the original one.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35274:
---
Labels: pull-request-available  (was: )

> Occasional failure issue with Flink CDC Db2 UT
> --
>
> Key: FLINK-35274
> URL: https://issues.apache.org/jira/browse/FLINK-35274
> Project: Flink
>  Issue Type: Bug
>Reporter: Xin Gong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
> tableId don't have database name, it will cause table schame occasional not 
> found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-04-30 Thread Xin Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Gong updated FLINK-35274:
-
Fix Version/s: 3.1.0

> Occasional failure issue with Flink CDC Db2 UT
> --
>
> Key: FLINK-35274
> URL: https://issues.apache.org/jira/browse/FLINK-35274
> Project: Flink
>  Issue Type: Bug
>Reporter: Xin Gong
>Priority: Critical
> Fix For: 3.1.0
>
>
> Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
> tableId don't have database name, it will cause table schame occasional not 
> found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-04-30 Thread Xin Gong (Jira)
Xin Gong created FLINK-35274:


 Summary: Occasional failure issue with Flink CDC Db2 UT
 Key: FLINK-35274
 URL: https://issues.apache.org/jira/browse/FLINK-35274
 Project: Flink
  Issue Type: Bug
Reporter: Xin Gong


Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
tableId don't have database name, it will cause table schame occasional not 
found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33761) Snowflake as JDBC source

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33761:
---
Labels: pull-request-available  (was: )

> Snowflake as JDBC source
> 
>
> Key: FLINK-33761
> URL: https://issues.apache.org/jira/browse/FLINK-33761
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Reporter: Boris Litvak
>Assignee: Boris Litvak
>Priority: Major
>  Labels: pull-request-available
>
> Following [https://flink.apache.org/how-to-contribute/contribute-code/,] I 
> would like contribute JDBC source integration with Snowflake.
> The implementation adds SnowflakeDialect and the relevant row converter and 
> factory.
> Need to:
>  * Reach consensus
>  * Find a committer willing to review and merge the change



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]

2024-04-30 Thread via GitHub


boring-cyborg[bot] commented on PR #118:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/118#issuecomment-2084941773

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone

2024-04-30 Thread Biao Geng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Geng updated FLINK-35273:
--
Description: 
The issue is from 
https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it 
turns out that the output result does not show the expected result.
Here is my test codes:
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, Configuration
from pyflink.table import DataTypes, StreamTableEnvironment
from datetime import datetime
import pytz


config = Configuration()
config.set_string("python.client.executable", 
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
config.set_string("python.executable", 
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
env = StreamExecutionEnvironment.get_execution_environment(config)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set_local_timezone("UTC")
# t_env.get_config().set_local_timezone("GMT-08:00")

input_table = t_env.from_elements(
[
(
"elementA",
datetime(year=2024, month=4, day=12, hour=8, minute=35),
),
(
"elementB",
datetime(year=2024, month=4, day=12, hour=8, minute=35, 
tzinfo=pytz.utc),
# datetime(year=2024, month=4, day=12, hour=8, minute=35, 
tzinfo=pytz.timezone('America/New_York')),
),
],
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
]
),
)
input_table.execute().print()

# SQL
sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT 
TO_TIMESTAMP_LTZ(171291090, 3);")
t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH 
('connector'='print');")
t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
{code}
The output is: 
{code:java}
+++-+
| op |   name |   timestamp |
+++-+
| +I |   elementA | 2024-04-12 08:35:00.000 |
| +I |   elementB | 2024-04-12 16:35:00.000 |
+++-+
2 rows in set
+I[2024-04-12T08:35:00Z]
{code}

In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow 
logic to convert python obj to sql type:

{code:python}
EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
...
def to_sql_type(self, dt):
if dt is not None:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
   else time.mktime(dt.timetuple()))
return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
{code}
It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not 
decided by the timezone set by `set_local_timezone`.


> PyFlink's LocalZonedTimestampType should respect timezone set by 
> set_local_timezone
> ---
>
> Key: FLINK-35273
> URL: https://issues.apache.org/jira/browse/FLINK-35273
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Biao Geng
>Priority: Major
>
> The issue is from 
> https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
> When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it 
> turns out that the output result does not show the expected result.
> Here is my test codes:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, Configuration
> from pyflink.table import DataTypes, StreamTableEnvironment
> from datetime import datetime
> import pytz
> config = Configuration()
> config.set_string("python.client.executable", 
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> config.set_string("python.executable", 
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set_local_timezone("UTC")
> # t_env.get_config().set_local_timezone("GMT-08:00")
> input_table = t_env.from_elements(
> [
> (
> "elementA",
> datetime(year=2024, month=4, day=12, hour=8, minute=35),
> ),
> (
> "elementB",
> datetime(year=2024, month=4, day=12, hour=8, minute=35, 
> tzinfo=pytz.utc),
> # datetime(year=2024, month=4, day=12, hour=8, minute=35, 
> tzinfo=pytz.timezone('America/New_York')),
> ),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("name", 

Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-30 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1584530942


##
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##
@@ -49,7 +49,7 @@ public interface StateFuture {
  * @param action the action to perform before completing the returned 
StateFuture.
  * @return the new StateFuture.
  */
-StateFuture thenAccept(Consumer action);
+StateFuture thenAccept(ConsumerWithException action);

Review Comment:
   IIUC, when constructing a `CompletableFuture`,  the parameters must be of 
type `runnable`, so the **checked exceptions** must be handled in 
`CompletableFuture`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone

2024-04-30 Thread Biao Geng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Geng updated FLINK-35273:
--
Component/s: API / Python

> PyFlink's LocalZonedTimestampType should respect timezone set by 
> set_local_timezone
> ---
>
> Key: FLINK-35273
> URL: https://issues.apache.org/jira/browse/FLINK-35273
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Biao Geng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone

2024-04-30 Thread Biao Geng (Jira)
Biao Geng created FLINK-35273:
-

 Summary: PyFlink's LocalZonedTimestampType should respect timezone 
set by set_local_timezone
 Key: FLINK-35273
 URL: https://issues.apache.org/jira/browse/FLINK-35273
 Project: Flink
  Issue Type: Bug
Reporter: Biao Geng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35192) operator oom

2024-04-30 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842322#comment-17842322
 ] 

Biao Geng commented on FLINK-35192:
---

[~stupid_pig], it looks like that pictures of the pmap is broken. But the glibc 
issue is well known, if that's the case, it makes sense to me to introduce 
jemalloc.

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-04-30 Thread via GitHub


fredia commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1584477969


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.StateRequest;
+import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The ForSt {@link StateRequestContainer} which can classify the state 
requests by ForStDB
+ * requestType (Get、Put or Iterator).
+ */
+public class ForStStateRequestClassifier implements StateRequestContainer {
+
+private final List> dbGetRequests;
+
+private final List> dbPutRequests;
+
+public ForStStateRequestClassifier() {
+this.dbGetRequests = new ArrayList<>();
+this.dbPutRequests = new ArrayList<>();
+}
+
+@Override
+public void offer(StateRequest stateRequest) {
+if (stateRequest.getState() == null) {

Review Comment:
   `StateRequestType#SYNC_POINT` would be added to the active buffer, it might 
be better to add a check here.
   
   
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java#L125-L127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]

2024-04-30 Thread via GitHub


jectpro7 commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1584465123


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -74,15 +76,20 @@ public void setup(
 final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
 final long asyncBufferTimeout =
 environment.getExecutionConfig().getAsyncStateBufferTimeout();
-// TODO: initial state executor and set state executor for aec
+
+AsyncKeyedStateBackend asyncKeyedStateBackend =
+Preconditions.checkNotNull(
+stateHandler.getAsyncKeyedStateBackend(),
+"Current State Backend doesn't support async access");
 this.asyncExecutionController =
 new AsyncExecutionController(
 mailboxExecutor,
-null,
+asyncKeyedStateBackend.createStateExecutor(),
 maxParallelism,
 asyncBufferSize,
 asyncBufferTimeout,
 inFlightRecordsLimit);
+asyncKeyedStateBackend.setup(asyncExecutionController);

Review Comment:
   the dependency looks like not very clear, can we make it like this?
   ```
   new AsyncExecutionController(
   mailboxExecutor,
   asyncKeyedStateBackend,
   maxParallelism,
   asyncBufferSize,
   asyncBufferTimeout,
   inFlightRecordsLimit);
   ```
   The AEC is responsible for creating the `stateExecutor` and set it up inside 
the construction method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27146] [Filesystem] Migrate to Junit5 [flink]

2024-04-30 Thread via GitHub


kottmann commented on PR #22789:
URL: https://github.com/apache/flink/pull/22789#issuecomment-2084837876

   This is fixed now as well, see new commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584453635


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 [operator 
模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 
Kubernetes 本地部署体验的核心。
+
+Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human 
Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 
的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。
+
+
+
+
+## 特征
+
+
+
+### 核心
+- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" 
>}})
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 处理错误,回滚失败的升级
+  - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 将作业顶点调整到合适的并行度
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584453151


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 [operator 
模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 
Kubernetes 本地部署体验的核心。
+
+Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human 
Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 
的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。
+
+
+
+
+## 特征
+
+
+
+### 核心
+- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" 
>}})
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 处理错误,回滚失败的升级
+  - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 将作业顶点调整到合适的并行度
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584451630


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 [operator 
模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 
Kubernetes 本地部署体验的核心。
+
+Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human 
Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 
的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。
+
+
+
+
+## 特征
+
+
+
+### 核心
+- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" 
>}})
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 处理错误,回滚失败的升级
+  - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置

Review Comment:
   done



##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584445284


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 [operator 
模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 
Kubernetes 本地部署体验的核心。
+
+Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human 
Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 
的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r158208


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -95,4 +100,32 @@ public V deserializeValue(byte[] valueBytes) throws 
IOException {
 inputView.setBuffer(valueBytes);
 return getValueSerializer().deserialize(inputView);
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public ForStDBGetRequest, V> buildDBGetRequest(
+StateRequest stateRequest) {
+Preconditions.checkArgument(stateRequest.getRequestType() == 
StateRequestType.VALUE_GET);
+ContextKey contextKey =
+new ContextKey<>((RecordContext) 
stateRequest.getRecordContext());
+return ForStDBGetRequest.of(
+contextKey, this, (InternalStateFuture) 
stateRequest.getFuture());
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public ForStDBPutRequest, V> buildDBPutRequest(
+StateRequest stateRequest) {
+Preconditions.checkArgument(
+stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE
+|| stateRequest.getRequestType() == 
StateRequestType.CLEAR);
+ContextKey contextKey =
+new ContextKey<>((RecordContext) 
stateRequest.getRecordContext());
+V value =
+(stateRequest.getRequestType() == StateRequestType.CLEAR)
+? null  // "Delete(key)" is equivalent to "Put(key, 
null)"
+: (V) stateRequest.getPayload();

Review Comment:
   The `payload` in `StateRequest` represents the input of this request.  The 
payload content for VALUE_PUT stateRequest is the `value` (see 
`InternalValueState#asyncUpdate`).
   
   Sorry for the misleading.  In pr-24681, the cached serialized key in 
`RecordContext` has been renamed to `extra`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584443822


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,78 +24,96 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
-
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
-
-## Features
-### Core
-- Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
-- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
+
+
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 [operator 
模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 
Kubernetes 本地部署体验的核心。
+
+Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human 
Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 
的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。
+
+
+
+
+## 特征
+
+
+
+### 核心
+- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" 
>}})
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 处理错误,回滚失败的升级
+  - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业

Review Comment:
 - Application 集群
 - Session 集群
 - Sessio job
 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:26 AM:


 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in 
hadoop, it is suspected that glibc has a memory leak due to memory 
fragmentation in multiple arenas under multi-threading. At present, we refer to 
the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

 

ENV

glibc-version: 2.31

jemalloc-version: 4.5


was (Author: stupid_pig):
 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

 

ENV

glibc-version: 2.31

jemalloc-version: 4.5

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> 

[jira] [Comment Edited] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:24 AM:


 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

 

ENV

glibc-version: 2.31

jemalloc-version: 4.5


was (Author: stupid_pig):
 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: 

[jira] [Comment Edited] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:19 AM:


 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464!

 

Observe the pmap results at these two time points and compare:

!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1243,height=489!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]


was (Author: stupid_pig):
 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621!

 

Observe the pmap results at these two time points and compare:


!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1583,height=623!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator 

Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1584431128


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.StateRequest;
+import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The ForSt {@link StateRequestContainer} which can classify the state 
requests by ForStDB
+ * requestType (Get、Put or Iterator).
+ */
+public class ForStStateRequestClassifier implements StateRequestContainer {
+
+private final List> dbGetRequests;
+
+private final List> dbPutRequests;
+
+public ForStStateRequestClassifier() {
+this.dbGetRequests = new ArrayList<>();
+this.dbPutRequests = new ArrayList<>();
+}
+
+@Override
+public void offer(StateRequest stateRequest) {
+if (stateRequest.getState() == null) {

Review Comment:
   The state could be null only for {@link StateRequestType#SYNC_POINT}. The 
ForStateBackend does nothing for {@link StateRequestType#SYNC_POINT}. 
   
   I have added some comments about this if-branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:17 AM:


 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621!

 

Observe the pmap results at these two time points and compare:


!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.

Referring to an issue in hadoop, it is suspected that glibc has a memory leak 
due to memory fragmentation in multiple arenas under multi-threading. At 
present, we refer to the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1583,height=623!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]


was (Author: stupid_pig):
 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621!
Observe the pmap results at these two time points and compare:
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.



Referring to an  [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in 
hadoop, it is suspected that glibc has a memory leak due to memory 
fragmentation in multiple arenas under multi-threading. At present, we refer to 
the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1583,height=623!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> 

Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1584427802


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.StateExecutor;
+import org.apache.flink.runtime.asyncprocessing.StateRequest;
+import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * The {@link StateExecutor} implementation which executing batch {@link 
StateRequest}s for
+ * ForStStateBackend.
+ */
+public class ForStStateExecutor implements StateExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStStateExecutor.class);
+
+/**
+ * The coordinator thread which schedules the execution of multiple 
batches of stateRequests.
+ * The number of coordinator threads is 1 to ensure that multiple batches 
of stateRequests can
+ * be executed sequentially.
+ */
+private final Executor coordinatorThread;

Review Comment:
   I have added a method `StateExecutor#shutdown` in which the workerThreads 
and the coordinatorThread will be close.  And whoever creates the 
`StateExecutor` has the responsibility to shutdown it when the task dispose. I 
think that role should be `AsyncKeyedStateBackend` or 
`AsyncExecutionController`. 
   
   The `AsyncKeyedStateBackend` has not been implemented yet. I think maybe the 
`ForStStateExecutor` should be shutdown  in the 
`AsyncKeyedStateBackend#dispose`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312
 ] 

chenyuzhi commented on FLINK-35192:
---

 

The op image we use is to modify the basic image of the DockerFile file from 
eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to 
use jdk tools). There is no modification at the code level [~bgeng777] 

Recently,  we using pmap cmd observed the memory growth of operator and made a 
comparison  and checking the process rss monitoring of the op host. The 
comparison time was 04.24 20:00-04.25 01:20.

The process rss occupation increased by 95 MB(34.725G-34.630G):
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924!
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621!
Observe the pmap results at these two time points and compare:
!https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png!
 

 

It can be found that the growth mainly comes from the front memory area, and 
there are many 65536KB memory blocks. At the same time, the front memory area 
increased from 74 MB (35444 KB). + 39668 KB) grows to

165 MB (65536 KB + 65536 KB + 35708 KB),  the increaing result  91MB(165MB - 
74MB)basically matching the memory growth of the process.



Referring to an  [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in 
hadoop, it is suspected that glibc has a memory leak due to memory 
fragmentation in multiple arenas under multi-threading. At present, we refer to 
the 
[DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24]
 of flink-docker, which using jemalloc as memory assigner.

We replacing the glibc used by the operator with jemalloc4.5, and adjust the 
Xmx of the operator from 32g to 5.25g.

Here is the memory usage of the process after adjustment:

!image-2024-04-30-17-11-24-974.png|width=1583,height=623!

 

Therefore, it is suspected that it is caused by a memory leak in glibc in a 
multi-threaded environment. Does the community consider changing the memory 
allocation of the op from glibc to jemalloc? If so, I will be happy to provide 
a PR to optimize this issue.

[~gyfora]

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyuzhi updated FLINK-35192:
--
Attachment: image-2024-04-30-17-11-24-974.png

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34902) INSERT INTO column mismatch leads to IndexOutOfBoundsException

2024-04-30 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-34902.

Resolution: Fixed

Fixed in master: 8a75f8f22b8e08e45f1a6453b87718eab6db115a

> INSERT INTO column mismatch leads to IndexOutOfBoundsException
> --
>
> Key: FLINK-34902
> URL: https://issues.apache.org/jira/browse/FLINK-34902
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SQL:
> {code}
> INSERT INTO t (a, b) SELECT 1;
> {code}
>  
> Stack trace:
> {code}
> org.apache.flink.table.api.ValidationException: SQL validation failed. Index 
> 1 out of bounds for length 1
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>     at
> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
> length 1
>     at 
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>     at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>     at 
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>     at java.base/java.util.Objects.checkIndex(Objects.java:374)
>     at java.base/java.util.ArrayList.get(ArrayList.java:459)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1(PreValidateReWriter.scala:355)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1$adapted(PreValidateReWriter.scala:355)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES [flink]

2024-04-30 Thread via GitHub


twalthr merged PR #24724:
URL: https://github.com/apache/flink/pull/24724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24640:
URL: https://github.com/apache/flink/pull/24640#discussion_r1584382431


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) {
 @Override
 public void close() throws IOException {}
 
+// 
+//  restore
+// 
+
+@Override
+public void restoreStateHandles(
+long checkpointId, SubtaskKey subtaskKey, 
Stream stateHandles) {
+
+Set uploadedLogicalFiles;
+synchronized (lock) {
+uploadedLogicalFiles =
+uploadedStates.computeIfAbsent(checkpointId, id -> new 
HashSet<>());
+}
+
+stateHandles.forEach(
+fileHandle -> {
+PhysicalFile physicalFile =
+new PhysicalFile(
+null,
+fileHandle.getFilePath(),
+physicalFileDeleter,

Review Comment:
   As discussed offline with @Zakelly , we can distinguish whether the restored 
filePath is under the managedDir scope (`managedSharedStateDir` or 
`managedExclusiveStateDir`),   to determine whether the file should be deleted 
by the `FileMergingSnapshotManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35192) operator oom

2024-04-30 Thread chenyuzhi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenyuzhi updated FLINK-35192:
--
Attachment: image-2024-04-30-16-47-07-289.png

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24640:
URL: https://github.com/apache/flink/pull/24640#discussion_r1584375937


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) {
 @Override
 public void close() throws IOException {}
 
+// 
+//  restore
+// 
+
+@Override
+public void restoreStateHandles(
+long checkpointId, SubtaskKey subtaskKey, 
Stream stateHandles) {
+
+Set uploadedLogicalFiles;
+synchronized (lock) {
+uploadedLogicalFiles =
+uploadedStates.computeIfAbsent(checkpointId, id -> new 
HashSet<>());
+}
+
+stateHandles.forEach(
+fileHandle -> {
+PhysicalFile physicalFile =
+new PhysicalFile(
+null,
+fileHandle.getFilePath(),
+physicalFileDeleter,

Review Comment:
   @fredia  In the case of the job restore (not failover and not rescaling),  
the jobID may be changed, and I think the JM should claim ownership of these 
files to delete them.  In this case, however, the parallelism of the job has 
not changed, so the `keyGroupRange` of the `KeyedStateHandle` cannot be used as 
judgment conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35192) operator oom

2024-04-30 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842294#comment-17842294
 ] 

Biao Geng commented on FLINK-35192:
---

Sure, created a 
[pr|https://github.com/apache/flink-kubernetes-operator/pull/822] for this.

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24640:
URL: https://github.com/apache/flink/pull/24640#discussion_r1584364578


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##
@@ -830,6 +832,8 @@ public CompletableFuture submitTask(
 
channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
 
 taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, 
task::isBackPressured);
+registerTaskRestoreInfoToFileMergingManager(
+fileMergingSnapshotManager, task.getTaskInfo(), 
taskRestore);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35212) PyFlink thread mode process just can run once in standalonesession mode

2024-04-30 Thread Wei Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842295#comment-17842295
 ] 

Wei Yuan commented on FLINK-35212:
--

Hi Biao Geng, as you say, maybe the exception which I submitted caused by I 
modified same files under python site-packages of pyflink when I try to solve 
FLINK-35180. And I met the same exception in commit #0 with Java8 and Python 
3.8.6 as you pasted .

Looking forward for your improvement for this.

> PyFlink thread mode process just can run once in standalonesession mode
> ---
>
> Key: FLINK-35212
> URL: https://issues.apache.org/jira/browse/FLINK-35212
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
> Environment: Python 3.10.14
> PyFlink==1.18.1
> openjdk version "11.0.21" 2023-10-17 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build 
> 11.0.21+9-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, 
> mixed mode, sharing)
>Reporter: Wei Yuan
>Priority: Critical
>
> {code:java}
> from pyflink.common.types import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> from pyflink.common.time import Instant
> # init task env
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> # config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> # create a batch TableEnvironment
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> # def test_func(row):
> # return row
> # result_ds.map(test_func).print()
> result_ds.print()
> env.execute()
> {code}
> Start a standalone session mode cluster by command: 
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code}
> Submit thread mode job for the first time, this job will success fnished.
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py 
> bug.py {code}
> Use above command to submit job for the second time, an error occured:
> {code:java}
> Job has been submitted with JobID a4f2728199277bba0500796f7925fa26
> Traceback (most recent call last):
>   File "/home/disk1/bug.py", line 34, in 
>     env.execute()
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 773, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
> line 1322, in __call__
>     return_value = get_return_value(
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
> line 146, in deco
>     return f(*a, **kw)
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
> 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: a4f2728199277bba0500796f7925fa26)
>         at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>         at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> 

Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]

2024-04-30 Thread via GitHub


flinkbot commented on PR #24750:
URL: https://github.com/apache/flink/pull/24750#issuecomment-2084709081

   
   ## CI report:
   
   * 9970141f26ce36804d528727728eb23648a1201f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]

2024-04-30 Thread via GitHub


ljz2051 commented on code in PR #24640:
URL: https://github.com/apache/flink/pull/24640#discussion_r1584361582


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) {
 @Override
 public void close() throws IOException {}
 
+// 
+//  restore
+// 
+
+@Override
+public void restoreStateHandles(
+long checkpointId, SubtaskKey subtaskKey, 
Stream stateHandles) {
+
+Set uploadedLogicalFiles;
+synchronized (lock) {
+uploadedLogicalFiles =
+uploadedStates.computeIfAbsent(checkpointId, id -> new 
HashSet<>());
+}
+
+stateHandles.forEach(
+fileHandle -> {
+PhysicalFile physicalFile =
+new PhysicalFile(
+null,
+fileHandle.getFilePath(),
+physicalFileDeleter,
+fileHandle.getScope());

Review Comment:
   Yes, I have refactor this method. Please review 
FileMergingSnapshotManagerBase#restoreStateHandles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35195) Support the execution of create materialized table in continuous refresh mode

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35195:
---
Labels: pull-request-available  (was: )

> Support the execution of create materialized table in continuous refresh mode
> -
>
> Key: FLINK-35195
> URL: https://issues.apache.org/jira/browse/FLINK-35195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support creates materialized table and its 
> background refresh job:
> {code:SQL}
> CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
>  
> [ ([  ]) ]
>  
> [COMMENT table_comment]
>  
> [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
>  
> [WITH (key1=val1, key2=val2, ...)]
>  
> FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY }
>  
> [REFRESH_MODE = { CONTINUOUS | FULL }]
>  
> AS 
>  
> :
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]

2024-04-30 Thread via GitHub


lsyldliu opened a new pull request, #24750:
URL: https://github.com/apache/flink/pull/24750

   ## What is the purpose of the change
   
   *Support the execution of create materialized table in continuous refresh 
mode*
   
   
   ## Brief change log
   
 - *Support the execution of create materialized table in continuous 
refresh mode*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added integration tests in MaterializedTableStatementITCase*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-30 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842292#comment-17842292
 ] 

Ryan Skraba commented on FLINK-35041:
-

1.20 test_ci_core 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59281=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8885

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35192) operator oom

2024-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35192:
---
Labels: pull-request-available  (was: )

> operator oom
> 
>
> Key: FLINK-35192
> URL: https://issues.apache.org/jira/browse/FLINK-35192
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.1
> Environment: jdk: openjdk11
> operator version: 1.6.1
>Reporter: chenyuzhi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-22-15-47-49-455.png, 
> image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, 
> image-2024-04-22-15-58-42-850.png, screenshot-1.png, screenshot-2.png, 
> screenshot-3.png
>
>
> The kubernetest operator docker process was killed by kernel cause out of 
> memory(the time is 2024.04.03: 18:16)
>  !image-2024-04-22-15-47-49-455.png! 
> Metrics:
> the pod memory (RSS) is increasing slowly in the past 7 days:
>  !screenshot-1.png! 
> However the jvm memory metrics of operator not shown obvious anomaly:
>  !image-2024-04-22-15-58-23-269.png! 
>  !image-2024-04-22-15-58-42-850.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35192] Remove usage of deleteOnExit() to reduce the memory usage of the operator [flink-kubernetes-operator]

2024-04-30 Thread via GitHub


bgeng777 opened a new pull request, #822:
URL: https://github.com/apache/flink-kubernetes-operator/pull/822

   
   
   ## What is the purpose of the change
   According to 
[FLINK-35192](https://issues.apache.org/jira/browse/FLINK-35192), the usage of 
file.deleteOnExit() will register a hashmap entry in the JVM which is cleaned 
only when the JVM shuts down. As a result, a lot of memory would be used when 
the operator is running for a long time with plenty of submissions. As we have 
guaranteed that those temporary config files would be cleaned up in 
FlinkConfigManager's cache mechanism, we can safely remove the usage of 
deleteOnExit().
   
   ## Brief change log
   
 - Remove the usage of deleteOnExit() in FlinkConfigBuilder
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   This change is already covered by existing tests, such as 
org.apache.flink.kubernetes.operator.config.FlinkConfigManagerTest#testConfUpdateAndCleanup.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / **no**)
 - Core observer or reconciler logic that is regularly executed: (yes / 
**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34071) Deadlock in AWS Kinesis Data Streams AsyncSink connector

2024-04-30 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842275#comment-17842275
 ] 

Aleksandr Pilipenko commented on FLINK-34071:
-

[~chalixar] for exception classification I following up by increasing 
visibility (FLINK-35269). For malicious records - will check additionally to 
see which exceptions to add as Fatal.

In terms of timeout - while nice to have, I am not sure it is related to this 
issue at all.

> Deadlock in AWS Kinesis Data Streams AsyncSink connector
> 
>
> Key: FLINK-34071
> URL: https://issues.apache.org/jira/browse/FLINK-34071
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Kinesis
>Affects Versions: aws-connector-3.0.0, 1.15.4, aws-connector-4.2.0
>Reporter: Aleksandr Pilipenko
>Priority: Major
>
> Sink operator hangs while flushing records, similarly to FLINK-32230. Error 
> observed even when using AWS SDK version that contains fix for async client 
> error handling [https://github.com/aws/aws-sdk-java-v2/pull/4402]
> Thread dump of stuck thread:
> {code:java}
> "sdk-async-response-1-6236" Id=11213 RUNNABLE
> at 
> app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$flush$5(AsyncSinkWriter.java:385)
> at 
> app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter$$Lambda$1778/0x000801141040.accept(Unknown
>  Source)
> at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.handleFullyFailedRequest(KinesisStreamsSinkWriter.java:210)
> at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.lambda$submitRequestEntries$1(KinesisStreamsSinkWriter.java:184)
> at 
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter$$Lambda$1965/0x0008011a0c40.accept(Unknown
>  Source)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x000801181840.accept(Unknown
>  Source)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda$1961/0x000801191c40.accept(Unknown
>  Source)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67)
> at 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda$1960/0x000801191840.accept(Unknown
>  Source)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> 

  1   2   >