Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]
h12567 commented on PR #24750: URL: https://github.com/apache/flink/pull/24750#issuecomment-2087873976 May I ask how to find the list of issues/requests I can contribute to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage
[ https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-35217: --- Fix Version/s: 1.18.2 1.19.1 > Missing fsync in FileSystemCheckpointStorage > > > Key: FLINK-35217 > URL: https://issues.apache.org/jira/browse/FLINK-35217 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing >Affects Versions: 1.17.0, 1.18.0, 1.19.0 >Reporter: Marc Aurel Fritz >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > While running Flink on a system with unstable power supply checkpoints were > regularly corrupted in the form of "_metadata" files with a file size of 0 > bytes. In all cases the previous checkpoint data had already been deleted, > causing progress to be lost completely. > Further investigation revealed that the "FileSystemCheckpointStorage" doesn't > perform "fsync" when writing a new checkpoint to disk. This means the old > checkpoint gets removed without making sure that the new one is durably > persisted on disk. "strace" on the jobmanager's process confirms this > behavior: > # The checkpoint chk-60's in-progress metadata is written at "openat" > # The checkpoint chk-60's in-progress metadata is atomically renamed at > "rename" > # The old checkpoint chk-59 is deleted at "unlink" > For durable persistence an "fsync" call is missing before step 3. > Full "strace" log: > {code:java} > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", > {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0 > [pid 51618] 11:44:30 > mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) > = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc860) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc740) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > 0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51618] 11:44:30 > rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51644] 11:44:30 > unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata") > = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...},
Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]
StefanRRichter merged PR #24752: URL: https://github.com/apache/flink/pull/24752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]
StefanRRichter merged PR #24751: URL: https://github.com/apache/flink/pull/24751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
borislitvak commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1585425522 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +/** Factory for {@link SnowflakeDialect}. */ +@Internal +public class SnowflakeDialectFactory implements JdbcDialectFactory { +@Override +public boolean acceptsURL(String url) { +return url.startsWith("jdbc:snowflake:"); +} + +@Override +public JdbcDialect create() { Review Comment: Will do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
borislitvak commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1585423831 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.table.types.logical.RowType; + +import org.postgresql.jdbc.PgArray; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Snowflake. + */ +@Internal +public class SnowflakeRowConverter extends AbstractPostgresCompatibleRowConverter { + +private static final long serialVersionUID = 1L; + +@Override +public String converterName() { +return "Snowflake"; +} + +// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types +public SnowflakeRowConverter(RowType rowType) { Review Comment: > I am not sure exactly how Snowflake SQL compares to the other SQLS. It seems to be close to SQL server , which has some TINYINT logic in its converter - it looks like Snowflake supports this type - so you need to add this to the converter. > > I also see https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server. Do any of these SQL quirks require converter changes? > > I notice that Snowflake SQL uses double quotes, where as Flink SQL uses back ticks , it would be worth testing that quotes work for Snowflake. https://docs.snowflake.com/en/sql-reference/intro-summary-data-types This code was tested with Flink, so whatever Flink does should work. I've provided an example in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35279) Support "last-state" upgrade mode for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-35279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alan Zhang updated FLINK-35279: --- Description: The "last-state" upgrade mode is only supported for Flink application mode today[1], we should provide a consistent / similar user experience in Flink session mode. [1] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades] {code:java} Last state upgrade mode is currently only supported for FlinkDeployments. {code} was: The "last-state" upgrade mode is only supported for Flink application mode today[1], we should provide a similar user experience in session mode. [[1]https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades] {code:java} Last state upgrade mode is currently only supported for FlinkDeployments. {code} > Support "last-state" upgrade mode for FlinkSessionJob > -- > > Key: FLINK-35279 > URL: https://issues.apache.org/jira/browse/FLINK-35279 > Project: Flink > Issue Type: New Feature >Reporter: Alan Zhang >Priority: Major > > The "last-state" upgrade mode is only supported for Flink application mode > today[1], we should provide a consistent / similar user experience in Flink > session mode. > [1] > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades] > {code:java} > Last state upgrade mode is currently only supported for FlinkDeployments. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35279) Support "last-state" upgrade mode for FlinkSessionJob
Alan Zhang created FLINK-35279: -- Summary: Support "last-state" upgrade mode for FlinkSessionJob Key: FLINK-35279 URL: https://issues.apache.org/jira/browse/FLINK-35279 Project: Flink Issue Type: New Feature Reporter: Alan Zhang The "last-state" upgrade mode is only supported for Flink application mode today[1], we should provide a similar user experience in session mode. [[1]https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades] {code:java} Last state upgrade mode is currently only supported for FlinkDeployments. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585113359 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Just checked in the operator logs as well, to potentially rule out a prom translation issue. I only see `NaN` for all three metrics in the logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34645) StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount fails
[ https://issues.apache.org/jira/browse/FLINK-34645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842430#comment-17842430 ] Ryan Skraba commented on FLINK-34645: - 1.18 Java 11 / Test (module: misc) https://github.com/apache/flink/actions/runs/8872328847/job/24356773170#step:10:21780 Again a slightly different output was received, but in the same test: {code} Apr 29 02:31:31 Expected size: 6 but was: 4 in: Apr 29 02:31:31 [Record @ (undef) : +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), Apr 29 02:31:31 Record @ (undef) : +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), Apr 29 02:31:31 Record @ (undef) : +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), Apr 29 02:31:31 Record @ (undef) : +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] Apr 29 02:31:31 at org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:110) Apr 29 02:31:31 at org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:70) Apr 29 02:31:31 at org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase.assertOutputEquals(ArrowPythonAggregateFunctionOperatorTestBase.java:62) Apr 29 02:31:31 at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java:209) {code} > StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount > fails > > > Key: FLINK-34645 > URL: https://issues.apache.org/jira/browse/FLINK-34645 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > {code} > Error: 02:27:17 02:27:17.025 [ERROR] Tests run: 3, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 0.658 s <<< FAILURE! - in > org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest > Error: 02:27:17 02:27:17.025 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount > Time elapsed: 0.3 s <<< FAILURE! > Mar 09 02:27:17 java.lang.AssertionError: > Mar 09 02:27:17 > Mar 09 02:27:17 Expected size: 8 but was: 6 in: > Mar 09 02:27:17 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Mar 09 02:27:17 Record @ (undef) : > +I(c2,3,1969-12-31T23:59:55,1970-01-01T00:00:05), > Mar 09 02:27:17 Record @ (undef) : > +I(c2,3,1970-01-01T00:00,1970-01-01T00:00:10), > Mar 09 02:27:17 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Mar 09 02:27:17 Watermark @ 1, > Mar 09 02:27:17 Watermark @ 2] > Mar 09 02:27:17 at > org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:110) > Mar 09 02:27:17 at > org.apache.flink.table.runtime.util.RowDataHarnessAssertor.assertOutputEquals(RowDataHarnessAssertor.java:70) > Mar 09 02:27:17 at > org.apache.flink.table.runtime.operators.python.aggregate.arrow.ArrowPythonAggregateFunctionOperatorTestBase.assertOutputEquals(ArrowPythonAggregateFunctionOperatorTestBase.java:62) > Mar 09 02:27:17 at > org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByCount(StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java:326) > Mar 09 02:27:17 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Great, thank you! Yes pendingRecords is present in all jobs and has a value (fluctuating, of course). I have never seen the operator's metric `flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be anything other than `NaN`, though. I'm observing it via prometheus, so maybe it's just a bug in the translation layer. edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`... (also, if this conversation is out of scope I'd be happy to move it to somewhere that is less tangential!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Great, thank you! Yes pendingRecords is present in all jobs and has a value (fluctuating, of course). I have never seen the operator's metric `flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be anything other than NaN, though. I'm observing it via prometheus, so maybe it's just a bug in the translation layer. edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`... (also, if this conversation is out of scope I'd be happy to move it to somewhere that is less tangential!) ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Thanks, that makes a lot of sense! Is catch up status determined by literal timestamps compared against the catch up duration? eg if a record was placed in kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, or are we still 10m behind? or is catch up determined by throughput numbers? just trying to get a better sense of "catch up" statistics! Perhaps our problem is that lag and source_date_rate -for every single job tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`) - is `NaN`. At least according to the exposed operator metrics themselves. If the operator can't see the lag then maybe it can't make an informed decision? I'm wondering if this is a bug on our configuration or maybe I'm just way off base. I should expect to see values for `LAG_Current` and `SOURCE_DATA_RATE_Current`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Great, thank you! Yes pendingRecords is present in all jobs and has a value (fluctuating, of course). I have never seen the operator's metric `flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be anything other than N/A, though. I'm observing it via prometheus, so maybe it's just a bug in the translation layer. edit: actually, same for `SOURCE_DATA_RATE` and `CURRENT_PROCESSING_RATE`... (also, if this conversation is out of scope I'd be happy to move it to somewhere that is less tangential!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Great, thank you! Yes pendingRecords is present in all jobs and has a value (fluctuating, of course). I have never seen the operator's metric `jobVertexID_LAG_Current` be anything other than N/A, though. I'm observing it via prometheus, so maybe it's just a bug in the translation layer. (also, if this conversation is out of scope I'd be happy to move it to somewhere that is less tangential!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585036601 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Great, thank you! Yes pendingRecords is present in all jobs and has a value (fluctuating, of course). I have never seen the operator's metric `flink_k8soperator_namespace_resource_AutoScaler_jobVertexID_LAG_Current` be anything other than N/A, though. I'm observing it via prometheus, so maybe it's just a bug in the translation layer. (also, if this conversation is out of scope I'd be happy to move it to somewhere that is less tangential!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
gyfora commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1585008279 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: lag time is defined as: number of outstanding records / current processing rate (so it's not based on timestamps) I would double check the pendingRecords metrics from the Kafka source because those should be there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Thanks, that makes a lot of sense! Is catch up status determined by literal timestamps compared against the catch up duration? eg if a record was placed in kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, or are we still 10m behind? or is catch up determined by throughput numbers? just trying to get a better sense of "catch up" statistics! Perhaps our problem is that lag and source_date_rate -for every single job tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`) - is `N/A`. At least according to the exposed operator metrics themselves. If the operator can't see the lag then maybe it can't make an informed decision? I'm wondering if this is a bug on our configuration or maybe I'm just way off base. I should expect to see values for `LAG_Current` and `SOURCE_DATA_RATE_Current`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Thanks, that makes a lot of sense! Is catch up status determined by literal timestamps compared against the catch up duration? eg if a record was placed in kafka 10m ago, and our expected catch up duration is 5m, then are we 5m behind, or are we still 10m behind? or is catch up determined by throughput numbers? just trying to get a better sense of "catch up" statistics! Perhaps our problem is that lag, for every single job tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the exposed operator metrics themselves. If the operator can't see the lag then maybe it can't make an informed decision? I'm wondering if this is a bug on our configuration or maybe I'm just way off base. I should expect to see values for `LAG_Current`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Thanks, that makes a lot of sense! Is catchup data rate determined by literal timestamps compared against the catchup duration? eg if a record was placed in kafka 10m ago, and our expected catchup duration is 5m, then are we 5m behind, or are we still 10m behind? or is catchup determined by throughput numbers? just trying to get a better sense of "catch up" statistics! Perhaps our problem is that lag, for every single job tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the exposed operator metrics themselves. If the operator can't see the lag then maybe it can't make an informed decision? I'm wondering if this is a bug on our configuration or maybe I'm just way off base. I should expect to see values for `LAG_Current`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32002] Adjust autoscaler defaults for release [flink-kubernetes-operator]
trystanj commented on code in PR #586: URL: https://github.com/apache/flink-kubernetes-operator/pull/586#discussion_r1584990017 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java: ## @@ -68,15 +68,16 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static final ConfigOption TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() -.defaultValue(0.1) +.defaultValue(0.4) Review Comment: Thanks, that makes a lot of sense! Is catchup data rate determined by literal timestamps compared against the catchup duration? eg if a record was placed in kafka 10m ago, and our expected catchup duration is 5m, then are we 5m behind, or are we still 10m behind? just trying to get a better sense of "catch up" statistics! Perhaps our problem is that lag, for every single job tracked (operator 1.7, Flink 1.18.1, all using `KafkaSource`), is `N/A`. At least according to the exposed operator metrics themselves. If the operator can't see the lag then maybe it can't make an informed decision? I'm wondering if this is a bug on our configuration or maybe I'm just way off base. I should expect to see values for `LAG_Current`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35277][cdc-connector][db2] Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. [flink-cdc]
vinlee19 commented on PR #3286: URL: https://github.com/apache/flink-cdc/pull/3286#issuecomment-2085539643 @ruanhang1993 @gong PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35278) Occasional NPE on k8s operator status replacement
Márton Balassi created FLINK-35278: -- Summary: Occasional NPE on k8s operator status replacement Key: FLINK-35278 URL: https://issues.apache.org/jira/browse/FLINK-35278 Project: Flink Issue Type: Bug Components: Kubernetes Operator Reporter: Márton Balassi Fix For: kubernetes-operator-1.9.0 Infrequently we get a null pointer exception on status replacement: {noformat} logger: io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher message: Error during error status handling. throwable: { [-] class: java.lang.NullPointerException msg: null stack: [ [-] org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:136) org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97) org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438) org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:213) org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:60) io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194) io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123) io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91) io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64) io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.lang.Thread.run(Thread.java:829) ] }{noformat} I suspect it probably is thrown by getResourceVersion() here: [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java#L136] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35277: --- Labels: pull-request-available (was: ) > Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. > - > > Key: FLINK-35277 > URL: https://issues.apache.org/jira/browse/FLINK-35277 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 3.1.0 > Environment: Flink 1.18.0 > Flink CDC 3.1-pre > DB2 11.5.x > >Reporter: wenli xiao >Priority: Minor > Labels: pull-request-available > Attachments: image-2024-04-30-22-19-17-350.png > > > 1. background > When attempting to use Flink CDC 3.1 in the Flink connector to load data from > DB2 to Apache Doris, I set up DB2 using the Docker image > `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous > CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', > 'MYTABLE'){}}}. However, I encountered an error when attempting to add the > eleventh table: > [23505][-803] One or more values in the INSERT statement, UPDATE statement, > or foreign key update caused by a DELETE statement are not valid because the > primary key, unique constraint or unique index identified by "2" constrains > table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index > key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 > !image-2024-04-30-22-19-17-350.png! > > 2. > The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a > duplicate primary key. > Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: > create table IBMSNAP_PRUNCNTL > ( > TARGET_SERVER CHARACTER(18) not null, > TARGET_OWNER VARCHAR(128) not null, > TARGET_TABLE VARCHAR(128) not null, > SYNCHTIME TIMESTAMP(6), > SYNCHPOINT VARCHAR(16) FOR BIT DATA, > SOURCE_OWNER VARCHAR(128) not null, > SOURCE_TABLE VARCHAR(128) not null, > SOURCE_VIEW_QUAL SMALLINT not null, > APPLY_QUAL CHARACTER(18) not null, > SET_NAME CHARACTER(18) not null, > CNTL_SERVER CHARACTER(18) not null, > TARGET_STRUCTURE SMALLINT not null, > CNTL_ALIAS CHARACTER(8), > PHYS_CHANGE_OWNER VARCHAR(128), > PHYS_CHANGE_TABLE VARCHAR(128), > MAP_ID VARCHAR(10) not null > ); > > create unique index IBMSNAP_PRUNCNTLX > on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, > APPLY_QUAL, SET_NAME, TARGET_SERVER, > TARGET_TABLE, TARGET_OWNER); > > create unique index IBMSNAP_PRUNCNTLX1 > on IBMSNAP_PRUNCNTL (MAP_ID); > > create index IBMSNAP_PRUNCNTLX2 > on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); > > create index IBMSNAP_PRUNCNTLX3 > on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); > The issue stems from the logic in {{asncdc.addtable}} not aligning with the > {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The > original insert statement is as follows: > – Original insert statement > SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || > 'TARGET_SERVER, ' || > 'TARGET_OWNER, ' || > 'TARGET_TABLE, ' || > 'SYNCHTIME, ' || > 'SYNCHPOINT, ' || > 'SOURCE_OWNER, ' || > 'SOURCE_TABLE, ' || > 'SOURCE_VIEW_QUAL, ' || > 'APPLY_QUAL, ' || > 'SET_NAME, ' || > 'CNTL_SERVER , ' || > 'TARGET_STRUCTURE , ' || > 'CNTL_ALIAS , ' || > 'PHYS_CHANGE_OWNER , ' || > 'PHYS_CHANGE_TABLE , ' || > 'MAP_ID ' || > ') VALUES ( ' || > '''KAFKA'', ' || > || tableschema || ''', ' || > || tablename || ''', ' || > 'NULL, ' || > 'NULL, ' || > || tableschema || ''', ' || > || tablename || ''', ' || > '0, ' || > '''KAFKAQUAL'', ' || > '''SET001'', ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '8, ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '''ASNCDC'', ' || > '''CDC_' || tableschema || '_' || tablename || ''', ' || > ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN > CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) > END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || > ' )'; > EXECUTE IMMEDIATE stmtSQL; > The {{max(MAP_ID)}}
[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wenli xiao updated FLINK-35277: --- Description: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE'){}}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: – Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || || tableschema || ''', ' || || tablename || ''', ' || 'NULL, ' || 'NULL, ' || || tableschema || ''', ' || || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents the addition of the eleventh table. For more details about {{{}asncdcaddremove.sql{}}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] was: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE'){}}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE
[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wenli xiao updated FLINK-35277: --- Environment: Flink 1.18.0 Flink CDC 3.1-pre DB2 11.5.x was: Flink 1.18.0 DB2 11.5.x > Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. > - > > Key: FLINK-35277 > URL: https://issues.apache.org/jira/browse/FLINK-35277 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 3.1.0 > Environment: Flink 1.18.0 > Flink CDC 3.1-pre > DB2 11.5.x > >Reporter: wenli xiao >Priority: Minor > Attachments: image-2024-04-30-22-19-17-350.png > > > 1. background > When attempting to use Flink CDC 3.1 in the Flink connector to load data from > DB2 to Apache Doris, I set up DB2 using the Docker image > {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, > I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', > 'MYTABLE'){}}}. However, I encountered an error when attempting to add the > eleventh table: > [23505][-803] One or more values in the INSERT statement, UPDATE statement, > or foreign key update caused by a DELETE statement are not valid because the > primary key, unique constraint or unique index identified by "2" constrains > table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index > key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 > !image-2024-04-30-22-19-17-350.png! > > 2. > The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a > duplicate primary key. > Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: > create table IBMSNAP_PRUNCNTL > ( > TARGET_SERVER CHARACTER(18) not null, > TARGET_OWNER VARCHAR(128) not null, > TARGET_TABLE VARCHAR(128) not null, > SYNCHTIME TIMESTAMP(6), > SYNCHPOINT VARCHAR(16) FOR BIT DATA, > SOURCE_OWNER VARCHAR(128) not null, > SOURCE_TABLE VARCHAR(128) not null, > SOURCE_VIEW_QUAL SMALLINT not null, > APPLY_QUAL CHARACTER(18) not null, > SET_NAME CHARACTER(18) not null, > CNTL_SERVER CHARACTER(18) not null, > TARGET_STRUCTURE SMALLINT not null, > CNTL_ALIAS CHARACTER(8), > PHYS_CHANGE_OWNER VARCHAR(128), > PHYS_CHANGE_TABLE VARCHAR(128), > MAP_ID VARCHAR(10) not null > ); > > create unique index IBMSNAP_PRUNCNTLX > on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, > APPLY_QUAL, SET_NAME, TARGET_SERVER, > TARGET_TABLE, TARGET_OWNER); > > create unique index IBMSNAP_PRUNCNTLX1 > on IBMSNAP_PRUNCNTL (MAP_ID); > > create index IBMSNAP_PRUNCNTLX2 > on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); > > create index IBMSNAP_PRUNCNTLX3 > on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); > The issue stems from the logic in {{asncdc.addtable}} not aligning with the > {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The > original insert statement is as follows: > – Original insert statement > SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || > 'TARGET_SERVER, ' || > 'TARGET_OWNER, ' || > 'TARGET_TABLE, ' || > 'SYNCHTIME, ' || > 'SYNCHPOINT, ' || > 'SOURCE_OWNER, ' || > 'SOURCE_TABLE, ' || > 'SOURCE_VIEW_QUAL, ' || > 'APPLY_QUAL, ' || > 'SET_NAME, ' || > 'CNTL_SERVER , ' || > 'TARGET_STRUCTURE , ' || > 'CNTL_ALIAS , ' || > 'PHYS_CHANGE_OWNER , ' || > 'PHYS_CHANGE_TABLE , ' || > 'MAP_ID ' || > ') VALUES ( ' || > '''KAFKA'', ' || > || tableschema || ''', ' || > || tablename || ''', ' || > 'NULL, ' || > 'NULL, ' || > || tableschema || ''', ' || > || tablename || ''', ' || > '0, ' || > '''KAFKAQUAL'', ' || > '''SET001'', ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '8, ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '''ASNCDC'', ' || > '''CDC_' || tableschema || '_' || tablename || ''', ' || > ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN > CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) > END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || > ' )'; > EXECUTE IMMEDIATE stmtSQL; > The
[jira] [Commented] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842404#comment-17842404 ] wenli xiao commented on FLINK-35277: asncdcaddremove.sql: https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189 > Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. > - > > Key: FLINK-35277 > URL: https://issues.apache.org/jira/browse/FLINK-35277 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 3.1.0 > Environment: Flink 1.18.0 > DB2 11.5.x > >Reporter: wenli xiao >Priority: Minor > Attachments: image-2024-04-30-22-19-17-350.png > > > 1. background > When attempting to use Flink CDC 3.1 in the Flink connector to load data from > DB2 to Apache Doris, I set up DB2 using the Docker image > {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, > I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', > 'MYTABLE'){}}}. However, I encountered an error when attempting to add the > eleventh table: > [23505][-803] One or more values in the INSERT statement, UPDATE statement, > or foreign key update caused by a DELETE statement are not valid because the > primary key, unique constraint or unique index identified by "2" constrains > table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index > key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 > !image-2024-04-30-22-19-17-350.png! > > 2. > The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a > duplicate primary key. > Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: > create table IBMSNAP_PRUNCNTL > ( > TARGET_SERVER CHARACTER(18) not null, > TARGET_OWNER VARCHAR(128) not null, > TARGET_TABLE VARCHAR(128) not null, > SYNCHTIME TIMESTAMP(6), > SYNCHPOINT VARCHAR(16) FOR BIT DATA, > SOURCE_OWNER VARCHAR(128) not null, > SOURCE_TABLE VARCHAR(128) not null, > SOURCE_VIEW_QUAL SMALLINT not null, > APPLY_QUAL CHARACTER(18) not null, > SET_NAME CHARACTER(18) not null, > CNTL_SERVER CHARACTER(18) not null, > TARGET_STRUCTURE SMALLINT not null, > CNTL_ALIAS CHARACTER(8), > PHYS_CHANGE_OWNER VARCHAR(128), > PHYS_CHANGE_TABLE VARCHAR(128), > MAP_ID VARCHAR(10) not null > ); > > create unique index IBMSNAP_PRUNCNTLX > on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, > APPLY_QUAL, SET_NAME, TARGET_SERVER, > TARGET_TABLE, TARGET_OWNER); > > create unique index IBMSNAP_PRUNCNTLX1 > on IBMSNAP_PRUNCNTL (MAP_ID); > > create index IBMSNAP_PRUNCNTLX2 > on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); > > create index IBMSNAP_PRUNCNTLX3 > on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); > The issue stems from the logic in {{asncdc.addtable}} not aligning with the > {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The > original insert statement is as follows: > – Original insert statement > SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || > 'TARGET_SERVER, ' || > 'TARGET_OWNER, ' || > 'TARGET_TABLE, ' || > 'SYNCHTIME, ' || > 'SYNCHPOINT, ' || > 'SOURCE_OWNER, ' || > 'SOURCE_TABLE, ' || > 'SOURCE_VIEW_QUAL, ' || > 'APPLY_QUAL, ' || > 'SET_NAME, ' || > 'CNTL_SERVER , ' || > 'TARGET_STRUCTURE , ' || > 'CNTL_ALIAS , ' || > 'PHYS_CHANGE_OWNER , ' || > 'PHYS_CHANGE_TABLE , ' || > 'MAP_ID ' || > ') VALUES ( ' || > '''KAFKA'', ' || > || tableschema || ''', ' || > || tablename || ''', ' || > 'NULL, ' || > 'NULL, ' || > || tableschema || ''', ' || > || tablename || ''', ' || > '0, ' || > '''KAFKAQUAL'', ' || > '''SET001'', ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '8, ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '''ASNCDC'', ' || > '''CDC_' || tableschema || '_' || tablename || ''', ' || > ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN > CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) > END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || >
[jira] [Updated] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wenli xiao updated FLINK-35277: --- Description: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE'){}}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: – Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || || tableschema || ''', ' || || tablename || ''', ' || 'NULL, ' || 'NULL, ' || || tableschema || ''', ' || || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents the addition of the eleventh table. For more details about {{{}asncdcaddremove.sql{}}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] was: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or
[jira] [Created] (FLINK-35277) Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
wenli xiao created FLINK-35277: -- Summary: Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. Key: FLINK-35277 URL: https://issues.apache.org/jira/browse/FLINK-35277 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: 3.1.0 Environment: Flink 1.18.0 DB2 11.5.x Reporter: wenli xiao Attachments: image-2024-04-30-22-19-17-350.png 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{Asncdc.IBMSNAP_PRUNCNTL}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: -- Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || || tableschema || ''', ' || || tablename || ''', ' || 'NULL, ' || 'NULL, ' || || tableschema || ''', ' || || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10))}}. This issue prevents the addition of the eleventh table. For more details about {{asncdcaddremove.sql}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter
[ https://issues.apache.org/jira/browse/FLINK-35275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842395#comment-17842395 ] PengFei Li commented on FLINK-35275: [~twalthr] Could you help confirm if this is a bug? If so, I'd like to fix it. > ArrayContainsFunction uses wrong DataType to create element getter > -- > > Key: FLINK-35275 > URL: https://issues.apache.org/jira/browse/FLINK-35275 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: PengFei Li >Priority: Minor > > > In ArrayContainsFunction, elementGetter is used to get elements of an array, > but it's created from the needle data type rather than the element data type > which will lead to wrong results. > {code:java} > public ArrayContainsFunction(SpecializedContext context) { > super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context); > final DataType needleDataType = > context.getCallContext().getArgumentDataTypes().get(1); > elementGetter = > ArrayData.createElementGetter(needleDataType.getLogicalType()); > > } {code} > For example, the following sql returns true, but the expected is false. The > element type is nullable int, and the needle type is non-nullable int. Using > the needle type to create element getter will convert the NULL element to 0, > so the result returns true. > {code:java} > SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35276) SortCodeGeneratorTest.testMultiKeys fails on negative zero
[ https://issues.apache.org/jira/browse/FLINK-35276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Skraba updated FLINK-35276: Attachment: job-logs.txt > SortCodeGeneratorTest.testMultiKeys fails on negative zero > -- > > Key: FLINK-35276 > URL: https://issues.apache.org/jira/browse/FLINK-35276 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 1.19.1 >Reporter: Ryan Skraba >Priority: Critical > Labels: test-stability > Attachments: job-logs.txt > > > 1.19 AdaptiveScheduler / Test (module: table) > [https://github.com/apache/flink/actions/runs/8864296211/job/24339523745#step:10:10757] > SortCodeGeneratorTest can fail if one of the generated random row values is > -0.0f. > {code:java} > Apr 28 02:38:03 expect: +I(,SqlRawValue{?},0.0,false); actual: > +I(,SqlRawValue{?},-0.0,false) > Apr 28 02:38:03 expect: +I(,SqlRawValue{?},-0.0,false); actual: > +I(,SqlRawValue{?},0.0,false) > ... > > ... > Apr 28 02:38:04 expect: +I(,null,4.9695407E17,false); actual: > +I(,null,4.9695407E17,false) > Apr 28 02:38:04 expect: +I(,null,-3.84924672E18,false); actual: > +I(,null,-3.84924672E18,false) > Apr 28 02:38:04 types: [[RAW('java.lang.Integer', ?), FLOAT, BOOLEAN]] > Apr 28 02:38:04 keys: [0, 1]] > Apr 28 02:38:04 expected: 0.0f > Apr 28 02:38:04 but was: -0.0f > Apr 28 02:38:04 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 28 02:38:04 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 28 02:38:04 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Apr 28 02:38:04 at > org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testInner(SortCodeGeneratorTest.java:632) > Apr 28 02:38:04 at > org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testMultiKeys(SortCodeGeneratorTest.java:143) > Apr 28 02:38:04 at java.lang.reflect.Method.invoke(Method.java:498) > {code} > In the test code, this is extremely unlikely to occur (one in 2²⁴?) but *has* > happened at this line (when the {{rnd.nextFloat()}} is {{0.0f}} and > {{rnd.nextLong()}} is negative: > [https://github.com/apache/flink/blob/e7ce0a2969633168b9395c683921aa49362ad7a4/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java#L255] > We can reproduce the failure by changing how likely {{0.0f}} is to be > generated at that line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35276) SortCodeGeneratorTest.testMultiKeys fails on negative zero
Ryan Skraba created FLINK-35276: --- Summary: SortCodeGeneratorTest.testMultiKeys fails on negative zero Key: FLINK-35276 URL: https://issues.apache.org/jira/browse/FLINK-35276 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.20.0, 1.19.1 Reporter: Ryan Skraba 1.19 AdaptiveScheduler / Test (module: table) [https://github.com/apache/flink/actions/runs/8864296211/job/24339523745#step:10:10757] SortCodeGeneratorTest can fail if one of the generated random row values is -0.0f. {code:java} Apr 28 02:38:03 expect: +I(,SqlRawValue{?},0.0,false); actual: +I(,SqlRawValue{?},-0.0,false) Apr 28 02:38:03 expect: +I(,SqlRawValue{?},-0.0,false); actual: +I(,SqlRawValue{?},0.0,false) ... ... Apr 28 02:38:04 expect: +I(,null,4.9695407E17,false); actual: +I(,null,4.9695407E17,false) Apr 28 02:38:04 expect: +I(,null,-3.84924672E18,false); actual: +I(,null,-3.84924672E18,false) Apr 28 02:38:04 types: [[RAW('java.lang.Integer', ?), FLOAT, BOOLEAN]] Apr 28 02:38:04 keys: [0, 1]] Apr 28 02:38:04 expected: 0.0f Apr 28 02:38:04 but was: -0.0f Apr 28 02:38:04 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Apr 28 02:38:04 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Apr 28 02:38:04 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Apr 28 02:38:04 at org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testInner(SortCodeGeneratorTest.java:632) Apr 28 02:38:04 at org.apache.flink.table.planner.codegen.SortCodeGeneratorTest.testMultiKeys(SortCodeGeneratorTest.java:143) Apr 28 02:38:04 at java.lang.reflect.Method.invoke(Method.java:498) {code} In the test code, this is extremely unlikely to occur (one in 2²⁴?) but *has* happened at this line (when the {{rnd.nextFloat()}} is {{0.0f}} and {{rnd.nextLong()}} is negative: [https://github.com/apache/flink/blob/e7ce0a2969633168b9395c683921aa49362ad7a4/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java#L255] We can reproduce the failure by changing how likely {{0.0f}} is to be generated at that line. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27146] [Filesystem] Migrate to Junit5 [flink]
kottmann commented on PR #22789: URL: https://github.com/apache/flink/pull/22789#issuecomment-2085392466 @ferenc-csaky Thank you for reviewing my PR and for your helpful feedback! I really appreciate your support in improving it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter
[ https://issues.apache.org/jira/browse/FLINK-35275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PengFei Li updated FLINK-35275: --- Description: In ArrayContainsFunction, elementGetter is used to get elements of an array, but it's created from the needle data type rather than the element data type which will lead to wrong results. {code:java} public ArrayContainsFunction(SpecializedContext context) { super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context); final DataType needleDataType = context.getCallContext().getArgumentDataTypes().get(1); elementGetter = ArrayData.createElementGetter(needleDataType.getLogicalType()); } {code} For example, the following sql returns true, but the expected is false. The element type is nullable int, and the needle type is non-nullable int. Using the needle type to create element getter will convert the NULL element to 0, so the result returns true. {code:java} SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code} was: In `ArrayContainsFunction`, `elementGetter` is used to get elements of an array, but it's created from the needle data type rather than the element data type which will lead to wrong results. {code:java} public ArrayContainsFunction(SpecializedContext context) { super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context); final DataType needleDataType = context.getCallContext().getArgumentDataTypes().get(1); elementGetter = ArrayData.createElementGetter(needleDataType.getLogicalType()); } {code} For example, the following sql returns true, but the expected is false. The element type is nullable int, and the needle type is non-nullable int. Using the needle type to create element getter will convert the NULL element to 0, so the result returns true. {code:java} SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code} > ArrayContainsFunction uses wrong DataType to create element getter > -- > > Key: FLINK-35275 > URL: https://issues.apache.org/jira/browse/FLINK-35275 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0 >Reporter: PengFei Li >Priority: Minor > > > In ArrayContainsFunction, elementGetter is used to get elements of an array, > but it's created from the needle data type rather than the element data type > which will lead to wrong results. > {code:java} > public ArrayContainsFunction(SpecializedContext context) { > super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context); > final DataType needleDataType = > context.getCallContext().getArgumentDataTypes().get(1); > elementGetter = > ArrayData.createElementGetter(needleDataType.getLogicalType()); > > } {code} > For example, the following sql returns true, but the expected is false. The > element type is nullable int, and the needle type is non-nullable int. Using > the needle type to create element getter will convert the NULL element to 0, > so the result returns true. > {code:java} > SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35275) ArrayContainsFunction uses wrong DataType to create element getter
PengFei Li created FLINK-35275: -- Summary: ArrayContainsFunction uses wrong DataType to create element getter Key: FLINK-35275 URL: https://issues.apache.org/jira/browse/FLINK-35275 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.16.0 Reporter: PengFei Li In `ArrayContainsFunction`, `elementGetter` is used to get elements of an array, but it's created from the needle data type rather than the element data type which will lead to wrong results. {code:java} public ArrayContainsFunction(SpecializedContext context) { super(BuiltInFunctionDefinitions.ARRAY_CONTAINS, context); final DataType needleDataType = context.getCallContext().getArgumentDataTypes().get(1); elementGetter = ArrayData.createElementGetter(needleDataType.getLogicalType()); } {code} For example, the following sql returns true, but the expected is false. The element type is nullable int, and the needle type is non-nullable int. Using the needle type to create element getter will convert the NULL element to 0, so the result returns true. {code:java} SELECT ARRAY_CONTAINS(ARRAY[1, NULL], 0){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]
mbalassi commented on code in PR #819: URL: https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584812594 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java: ## @@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) { }) .add(deploymentName); +// Minor version computed from the above +var subVersions = flinkVersion.split("\\."); +String minorVersion = MALFORMED_MINOR_VERSION; Review Comment: Fair, I am open to simply using `UNKNOWN`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]
flinkbot commented on PR #24752: URL: https://github.com/apache/flink/pull/24752#issuecomment-2085320637 ## CI report: * 0670f3aec8b7aaa6db1192a8cdead22d9c6b925b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]
flinkbot commented on PR #24751: URL: https://github.com/apache/flink/pull/24751#issuecomment-2085320237 ## CI report: * e5d3c1296ac763d40175720c8fff58716ab1a3b1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]
mbalassi commented on code in PR #819: URL: https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584810870 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java: ## @@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) { private void initFlinkVersions(String ns, String flinkVersion) { parentMetricGroup .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) -.addGroup(FLINK_VERSION_GROUP_NAME) -.addGroup(flinkVersion) +.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion) Review Comment: Good question. This is actually semantically different. We can consider a previous behavior a bug, this approach works well with modern metrics systems like Prometheus/Graphana. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java: ## @@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) { private void initFlinkVersions(String ns, String flinkVersion) { parentMetricGroup .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) -.addGroup(FLINK_VERSION_GROUP_NAME) -.addGroup(flinkVersion) +.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion) Review Comment: Good question. This is actually semantically different. We can consider the previous behavior a bug, this approach works well with modern metrics systems like Prometheus/Graphana. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Backport: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) [flink]
StefanRRichter opened a new pull request, #24751: URL: https://github.com/apache/flink/pull/24751 Backport of FLINK-35217 for 1.19 (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35183] MinorVersion metric for tracking applications [flink-kubernetes-operator]
gaborgsomogyi commented on code in PR #819: URL: https://github.com/apache/flink-kubernetes-operator/pull/819#discussion_r1584790463 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java: ## @@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) { private void initFlinkVersions(String ns, String flinkVersion) { parentMetricGroup .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns) -.addGroup(FLINK_VERSION_GROUP_NAME) -.addGroup(flinkVersion) +.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion) Review Comment: This is not related, just merging the 2 statements to one, right? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java: ## @@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) { }) .add(deploymentName); +// Minor version computed from the above +var subVersions = flinkVersion.split("\\."); +String minorVersion = MALFORMED_MINOR_VERSION; Review Comment: Not sure why not use `UNKNOWN` just like the other case but I'm fine with that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [mysql] Mysql-cdc adapt mariadb. [flink-cdc]
ThisisWilli commented on PR #2494: URL: https://github.com/apache/flink-cdc/pull/2494#issuecomment-2085275760 > Hi @ThisisWilli, thanks for your great contribution! Could you please rebase this PR with latest `master` branch? You may need to rename some packages like from `com.ververica.cdc` to `org.apache.flink.cdc`. > > cc @leonardBang @yuxiqian I'll find time to finish this work in mid-May. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842366#comment-17842366 ] chenyuzhi commented on FLINK-35192: --- It's probably not that the images aren't broken, but that they're too big and I've re-scaled them to view them [~bgeng777] > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, > image-2024-04-30-17-11-24-974.png, image-2024-04-30-20-38-25-195.png, > image-2024-04-30-20-39-05-109.png, image-2024-04-30-20-39-34-396.png, > image-2024-04-30-20-41-51-660.png, image-2024-04-30-20-43-20-125.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi edited comment on FLINK-35192 at 4/30/24 12:44 PM: - The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !image-2024-04-30-20-39-34-396.png|width=611,height=343! !image-2024-04-30-20-39-05-109.png|width=678,height=397! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=682,height=148! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-20-43-20-125.png|width=632,height=250! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] ENV glibc-version: 2.31 jemalloc-version: 4.5 was (Author: stupid_pig): The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] ENV glibc-version: 2.31 jemalloc-version: 4.5 > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment:
[jira] [Commented] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage
[ https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842363#comment-17842363 ] Roman Khachatryan commented on FLINK-35217: --- [~srichter] would you mind backporting the fix to previous releases? It should at least be ported to one previous release according to support policy, ideally to two. > Missing fsync in FileSystemCheckpointStorage > > > Key: FLINK-35217 > URL: https://issues.apache.org/jira/browse/FLINK-35217 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing >Affects Versions: 1.17.0, 1.18.0, 1.19.0 >Reporter: Marc Aurel Fritz >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > > While running Flink on a system with unstable power supply checkpoints were > regularly corrupted in the form of "_metadata" files with a file size of 0 > bytes. In all cases the previous checkpoint data had already been deleted, > causing progress to be lost completely. > Further investigation revealed that the "FileSystemCheckpointStorage" doesn't > perform "fsync" when writing a new checkpoint to disk. This means the old > checkpoint gets removed without making sure that the new one is durably > persisted on disk. "strace" on the jobmanager's process confirms this > behavior: > # The checkpoint chk-60's in-progress metadata is written at "openat" > # The checkpoint chk-60's in-progress metadata is atomically renamed at > "rename" > # The old checkpoint chk-59 is deleted at "unlink" > For durable persistence an "fsync" call is missing before step 3. > Full "strace" log: > {code:java} > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", > {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0 > [pid 51618] 11:44:30 > mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) > = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc860) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc740) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > 0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51618] 11:44:30 > rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51644] 11:44:30 > unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata") > = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, >
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
davidradl commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584710588 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.table.types.logical.RowType; + +import org.postgresql.jdbc.PgArray; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Snowflake. + */ +@Internal +public class SnowflakeRowConverter extends AbstractPostgresCompatibleRowConverter { + +private static final long serialVersionUID = 1L; + +@Override +public String converterName() { +return "Snowflake"; +} + +// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types +public SnowflakeRowConverter(RowType rowType) { Review Comment: I am not sure exactly how Snowflake SQL compares to the other SQLS. It seems to be close to SQL server , which has some TINYINT logic in its converter - it looks like Snowflake supports this type - so you need to add this to the converter. I also see [https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server](https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server). Do any of these SQL quirks require converter changes? I notice that Snowflake SQL uses double quotes, where as Flink SQL uses back ticks , it would be worth testing that quotes work for Snowflake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
davidradl commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +/** Factory for {@link SnowflakeDialect}. */ +@Internal +public class SnowflakeDialectFactory implements JdbcDialectFactory { +@Override +public boolean acceptsURL(String url) { +return url.startsWith("jdbc:snowflake:"); +} + +@Override +public JdbcDialect create() { Review Comment: Please could you add some junits and docs for this pr please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
davidradl commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584710588 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.table.types.logical.RowType; + +import org.postgresql.jdbc.PgArray; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Snowflake. + */ +@Internal +public class SnowflakeRowConverter extends AbstractPostgresCompatibleRowConverter { + +private static final long serialVersionUID = 1L; + +@Override +public String converterName() { +return "Snowflake"; +} + +// https://docs.snowflake.com/en/sql-reference/intro-summary-data-types +public SnowflakeRowConverter(RowType rowType) { Review Comment: I am not sure exactly how Snowflake SQL compares to the other SQLS. It seems to be close to SQL server , which has some TINYINT logic in its converter - it looks like Snowflake supports this. I also see [https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server](https://docs.uipath.com/process-mining/automation-suite/2022.10/user-guide/sql-differences-between-snowflake-and-sql-server). Do any of these SQL quirks require converter changes? I notice that Snowflake SQL uses double quotes, where as Flink SQL uses back ticks , it would be worth testing that quotes work for Snowflake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
davidradl commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +/** Factory for {@link SnowflakeDialect}. */ +@Internal +public class SnowflakeDialectFactory implements JdbcDialectFactory { +@Override +public boolean acceptsURL(String url) { +return url.startsWith("jdbc:snowflake:"); +} + +@Override +public JdbcDialect create() { Review Comment: Please could you add some junits and docs for this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
davidradl commented on code in PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#discussion_r1584685732 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +/** Factory for {@link SnowflakeDialect}. */ +@Internal +public class SnowflakeDialectFactory implements JdbcDialectFactory { +@Override +public boolean acceptsURL(String url) { +return url.startsWith("jdbc:snowflake:"); +} + +@Override +public JdbcDialect create() { Review Comment: Please could you add some junits for this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage
[ https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-35217. -- Resolution: Fixed Merged to master in [{{80af4d5}}|https://github.com/apache/flink/commit/80af4d502318348ba15a8f75a2a622ce9dbdc968] > Missing fsync in FileSystemCheckpointStorage > > > Key: FLINK-35217 > URL: https://issues.apache.org/jira/browse/FLINK-35217 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing >Affects Versions: 1.17.0, 1.18.0, 1.19.0 >Reporter: Marc Aurel Fritz >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > > While running Flink on a system with unstable power supply checkpoints were > regularly corrupted in the form of "_metadata" files with a file size of 0 > bytes. In all cases the previous checkpoint data had already been deleted, > causing progress to be lost completely. > Further investigation revealed that the "FileSystemCheckpointStorage" doesn't > perform "fsync" when writing a new checkpoint to disk. This means the old > checkpoint gets removed without making sure that the new one is durably > persisted on disk. "strace" on the jobmanager's process confirms this > behavior: > # The checkpoint chk-60's in-progress metadata is written at "openat" > # The checkpoint chk-60's in-progress metadata is atomically renamed at > "rename" > # The old checkpoint chk-59 is deleted at "unlink" > For durable persistence an "fsync" call is missing before step 3. > Full "strace" log: > {code:java} > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", > {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0 > [pid 51618] 11:44:30 > mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) > = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc860) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc740) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > 0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51618] 11:44:30 > rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51644] 11:44:30 > unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata") > = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644]
Re: [PR] [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. [flink]
StefanRRichter merged PR #24722: URL: https://github.com/apache/flink/pull/24722 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35217) Missing fsync in FileSystemCheckpointStorage
[ https://issues.apache.org/jira/browse/FLINK-35217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-35217: --- Fix Version/s: 1.20.0 > Missing fsync in FileSystemCheckpointStorage > > > Key: FLINK-35217 > URL: https://issues.apache.org/jira/browse/FLINK-35217 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing >Affects Versions: 1.17.0, 1.18.0, 1.19.0 >Reporter: Marc Aurel Fritz >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > > While running Flink on a system with unstable power supply checkpoints were > regularly corrupted in the form of "_metadata" files with a file size of 0 > bytes. In all cases the previous checkpoint data had already been deleted, > causing progress to be lost completely. > Further investigation revealed that the "FileSystemCheckpointStorage" doesn't > perform "fsync" when writing a new checkpoint to disk. This means the old > checkpoint gets removed without making sure that the new one is durably > persisted on disk. "strace" on the jobmanager's process confirms this > behavior: > # The checkpoint chk-60's in-progress metadata is written at "openat" > # The checkpoint chk-60's in-progress metadata is atomically renamed at > "rename" > # The old checkpoint chk-59 is deleted at "unlink" > For durable persistence an "fsync" call is missing before step 3. > Full "strace" log: > {code:java} > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fc970) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > 0x7fd2ad5fca00) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc", > {st_mode=S_IFDIR|0755, st_size=42, ...}) = 0 > [pid 51618] 11:44:30 > mkdir("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", 0777) > = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc860) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata", > 0x7fd2ad5fc740) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > 0x7fd2ad5fc7d0) = -1 ENOENT (No such file or directory) > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51618] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 51618] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51618] 11:44:30 > rename("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/._metadata.inprogress.bf9518dc-2100-4524-9e67-e42913c2b8e8", > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-60/_metadata") = > 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata", > {st_mode=S_IFREG|0644, st_size=23378, ...}) = 0 > [pid 51644] 11:44:30 > unlink("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59/_metadata") > = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 51644] 11:44:30 > stat("/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 51644] 11:44:30 openat(AT_FDCWD, > "/opt/flink/statestore/e1c541c4568515e77df32d82727e20dc/chk-59", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 51644] 11:44:30 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 51644] 11:44:30 >
Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]
gong commented on PR #3284: URL: https://github.com/apache/flink-cdc/pull/3284#issuecomment-2085022086 @PatrickRen PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]
gong commented on PR #3283: URL: https://github.com/apache/flink-cdc/pull/3283#issuecomment-2085021893 @PatrickRen PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35176][Connector/JDBC] Support property authentication connection for JDBC catalog & dynamic table [flink-connector-jdbc]
eskabetxe commented on code in PR #116: URL: https://github.com/apache/flink-connector-jdbc/pull/116#discussion_r1584601921 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java: ## @@ -88,32 +92,49 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); protected final ClassLoader userClassLoader; -protected final String username; -protected final String pwd; protected final String baseUrl; protected final String defaultUrl; +protected final Properties connectionProperties; +@Deprecated public AbstractJdbcCatalog( ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) { +this( +userClassLoader, +catalogName, +defaultDatabase, +baseUrl, +getBriefAuthProperties(username, pwd)); +} + +public AbstractJdbcCatalog( +ClassLoader userClassLoader, +String catalogName, +String defaultDatabase, +String baseUrl, +Properties connectionProperties) { super(catalogName, defaultDatabase); checkNotNull(userClassLoader); -checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); -checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); JdbcCatalogUtils.validateJdbcUrl(baseUrl); this.userClassLoader = userClassLoader; -this.username = username; -this.pwd = pwd; this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; this.defaultUrl = this.baseUrl + defaultDatabase; +this.connectionProperties = Preconditions.checkNotNull(connectionProperties); +if (StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY))) { Review Comment: we should maintain the current checks ``` checkArgument(!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY))); checkArgument(!StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(PASSWORD_KEY))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35272) Pipeline Transform job supports omitting / renaming calculation column
[ https://issues.apache.org/jira/browse/FLINK-35272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35272: --- Labels: pull-request-available (was: ) > Pipeline Transform job supports omitting / renaming calculation column > -- > > Key: FLINK-35272 > URL: https://issues.apache.org/jira/browse/FLINK-35272 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: 3.1.0 >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, pipeline transform rules require all columns used by expression > calculation must be present in final projected schema, and shall not be > renamed or omitted. > > The reason behind this is any column not directly present in projection rules > will be filtered out in the PreProjection step, and then the PostProjection > process could not find those non-present but indirectly depended columns. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian opened a new pull request, #3285: URL: https://github.com/apache/flink-cdc/pull/3285 This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272). Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology: ``` SchemaTransformOp --> DataTransformOp --> SchemaOp ``` where schema projections are applied in `SchemaTransformOp` and data projection & filtering are applied in `DataTransformOp`. The idea is `SchemaTransformOp` might be embedded in `Sources` in the future to reduce payload data size transferred in Flink Job. However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in `DataTransformOp`. See a example as follows: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as newname filter: age > 18 ``` Such transformation rules will fail since `name` and `age` columns are removed in `SchemaTransformOp`, and those data rows could not be retrieved in `DataTransformOp`, where the actual expression evaluation and filtering comes into effect. This PR introduces a new design, renaming the transform topology as follows: ``` PreTransformOp --> PostTransformOp --> SchemaOp ``` where the `PreTransformOp` filters out columns, but only if: * The column is not present in projection rules * The column is not indirectly referenced by calculation and filtering expressions If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like `[id, newname, __PREFIX__name, __PREFIX__age]` will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so a `DataChangeEvent` would be like `[1, null, 'Alice', 19]`. Adding prefix is meant to deal with such cases: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as name filter: age > 18 ``` Here we need to distinguish the calculated column `(new) name` and the referenced original column `(old) name`. So after the name mangling process the schema would be like: `[id, name, __PREFIX__name]`. Also, the filtering process is still done in `PostTransformOp` since user could write down a filter expression that references calculated column, but their value won't be available until `PostTransformOp`'s evaluation. It also means in the following somewhat ambigious case: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, age * 2 as age filter: age > 18 ``` The filtering expression is applied to the calculated `age` column (doubled!) instead of the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
[ https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35274: --- Labels: pull-request-available (was: ) > Occasional failure issue with Flink CDC Db2 UT > -- > > Key: FLINK-35274 > URL: https://issues.apache.org/jira/browse/FLINK-35274 > Project: Flink > Issue Type: Bug >Reporter: Xin Gong >Priority: Critical > Labels: pull-request-available > Fix For: 3.1.0 > > > Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data > tableId don't have database name, it will cause table schame occasional not > found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
[ https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Gong updated FLINK-35274: - Fix Version/s: 3.1.0 > Occasional failure issue with Flink CDC Db2 UT > -- > > Key: FLINK-35274 > URL: https://issues.apache.org/jira/browse/FLINK-35274 > Project: Flink > Issue Type: Bug >Reporter: Xin Gong >Priority: Critical > Fix For: 3.1.0 > > > Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data > tableId don't have database name, it will cause table schame occasional not > found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
Xin Gong created FLINK-35274: Summary: Occasional failure issue with Flink CDC Db2 UT Key: FLINK-35274 URL: https://issues.apache.org/jira/browse/FLINK-35274 Project: Flink Issue Type: Bug Reporter: Xin Gong Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data tableId don't have database name, it will cause table schame occasional not found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33761) Snowflake as JDBC source
[ https://issues.apache.org/jira/browse/FLINK-33761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33761: --- Labels: pull-request-available (was: ) > Snowflake as JDBC source > > > Key: FLINK-33761 > URL: https://issues.apache.org/jira/browse/FLINK-33761 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC >Reporter: Boris Litvak >Assignee: Boris Litvak >Priority: Major > Labels: pull-request-available > > Following [https://flink.apache.org/how-to-contribute/contribute-code/,] I > would like contribute JDBC source integration with Snowflake. > The implementation adds SnowflakeDialect and the relevant row converter and > factory. > Need to: > * Reach consensus > * Find a committer willing to review and merge the change -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33761][Connector/JDBC] Add Snowflake JDBC Dialect [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #118: URL: https://github.com/apache/flink-connector-jdbc/pull/118#issuecomment-2084941773 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone
[ https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Geng updated FLINK-35273: -- Description: The issue is from https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399 When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it turns out that the output result does not show the expected result. Here is my test codes: {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types, Configuration from pyflink.table import DataTypes, StreamTableEnvironment from datetime import datetime import pytz config = Configuration() config.set_string("python.client.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") config.set_string("python.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") env = StreamExecutionEnvironment.get_execution_environment(config) t_env = StreamTableEnvironment.create(env) t_env.get_config().set_local_timezone("UTC") # t_env.get_config().set_local_timezone("GMT-08:00") input_table = t_env.from_elements( [ ( "elementA", datetime(year=2024, month=4, day=12, hour=8, minute=35), ), ( "elementB", datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.utc), # datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.timezone('America/New_York')), ), ], DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), ] ), ) input_table.execute().print() # SQL sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT TO_TIMESTAMP_LTZ(171291090, 3);") t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH ('connector'='print');") t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;") {code} The output is: {code:java} +++-+ | op | name | timestamp | +++-+ | +I | elementA | 2024-04-12 08:35:00.000 | | +I | elementB | 2024-04-12 16:35:00.000 | +++-+ 2 rows in set +I[2024-04-12T08:35:00Z] {code} In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow logic to convert python obj to sql type: {code:python} EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6 ... def to_sql_type(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL {code} It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not decided by the timezone set by `set_local_timezone`. > PyFlink's LocalZonedTimestampType should respect timezone set by > set_local_timezone > --- > > Key: FLINK-35273 > URL: https://issues.apache.org/jira/browse/FLINK-35273 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Biao Geng >Priority: Major > > The issue is from > https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399 > When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it > turns out that the output result does not show the expected result. > Here is my test codes: > {code:python} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types, Configuration > from pyflink.table import DataTypes, StreamTableEnvironment > from datetime import datetime > import pytz > config = Configuration() > config.set_string("python.client.executable", > "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") > config.set_string("python.executable", > "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") > env = StreamExecutionEnvironment.get_execution_environment(config) > t_env = StreamTableEnvironment.create(env) > t_env.get_config().set_local_timezone("UTC") > # t_env.get_config().set_local_timezone("GMT-08:00") > input_table = t_env.from_elements( > [ > ( > "elementA", > datetime(year=2024, month=4, day=12, hour=8, minute=35), > ), > ( > "elementB", > datetime(year=2024, month=4, day=12, hour=8, minute=35, > tzinfo=pytz.utc), > # datetime(year=2024, month=4, day=12, hour=8, minute=35, > tzinfo=pytz.timezone('America/New_York')), > ), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("name",
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1584530942 ## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java: ## @@ -49,7 +49,7 @@ public interface StateFuture { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ -StateFuture thenAccept(Consumer action); +StateFuture thenAccept(ConsumerWithException action); Review Comment: IIUC, when constructing a `CompletableFuture`, the parameters must be of type `runnable`, so the **checked exceptions** must be handled in `CompletableFuture`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone
[ https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Geng updated FLINK-35273: -- Component/s: API / Python > PyFlink's LocalZonedTimestampType should respect timezone set by > set_local_timezone > --- > > Key: FLINK-35273 > URL: https://issues.apache.org/jira/browse/FLINK-35273 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Biao Geng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35273) PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone
Biao Geng created FLINK-35273: - Summary: PyFlink's LocalZonedTimestampType should respect timezone set by set_local_timezone Key: FLINK-35273 URL: https://issues.apache.org/jira/browse/FLINK-35273 Project: Flink Issue Type: Bug Reporter: Biao Geng -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842322#comment-17842322 ] Biao Geng commented on FLINK-35192: --- [~stupid_pig], it looks like that pictures of the pmap is broken. But the glibc issue is well known, if that's the case, it makes sense to me to introduce jemalloc. > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, > image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, > screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]
fredia commented on code in PR #24739: URL: https://github.com/apache/flink/pull/24739#discussion_r1584477969 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java: ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.asyncprocessing.StateRequest; +import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.util.ArrayList; +import java.util.List; + +/** + * The ForSt {@link StateRequestContainer} which can classify the state requests by ForStDB + * requestType (Get、Put or Iterator). + */ +public class ForStStateRequestClassifier implements StateRequestContainer { + +private final List> dbGetRequests; + +private final List> dbPutRequests; + +public ForStStateRequestClassifier() { +this.dbGetRequests = new ArrayList<>(); +this.dbPutRequests = new ArrayList<>(); +} + +@Override +public void offer(StateRequest stateRequest) { +if (stateRequest.getState() == null) { Review Comment: `StateRequestType#SYNC_POINT` would be added to the active buffer, it might be better to add a check here. https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java#L125-L127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]
jectpro7 commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1584465123 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -74,15 +76,20 @@ public void setup( final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); final long asyncBufferTimeout = environment.getExecutionConfig().getAsyncStateBufferTimeout(); -// TODO: initial state executor and set state executor for aec + +AsyncKeyedStateBackend asyncKeyedStateBackend = +Preconditions.checkNotNull( +stateHandler.getAsyncKeyedStateBackend(), +"Current State Backend doesn't support async access"); this.asyncExecutionController = new AsyncExecutionController( mailboxExecutor, -null, +asyncKeyedStateBackend.createStateExecutor(), maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit); +asyncKeyedStateBackend.setup(asyncExecutionController); Review Comment: the dependency looks like not very clear, can we make it like this? ``` new AsyncExecutionController( mailboxExecutor, asyncKeyedStateBackend, maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit); ``` The AEC is responsible for creating the `stateExecutor` and set it up inside the construction method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27146] [Filesystem] Migrate to Junit5 [flink]
kottmann commented on PR #22789: URL: https://github.com/apache/flink/pull/22789#issuecomment-2084837876 This is fixed now as well, see new commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584453635 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 + +Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。 + + + + +## 特征 + + + +### 核心 +- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) + - 运行、暂停和删除应用程序 + - 有状态和无状态应用程序升级 + - 保存点的触发和管理 + - 处理错误,回滚失败的升级 + - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - - Application cluster - - Session cluster - - Session job + - 应用程序集群 + - 会话集群 + - 会话作业 - Built-in [High Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) - Extensible framework - [Custom validators]({{< ref "docs/operations/plugins#custom-flink-resource-validators" >}}) - [Custom resource listeners]({{< ref "docs/operations/plugins#custom-flink-resource-listeners" >}}) - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) management - - Default configurations with dynamic updates - - Per job configuration - - Environment variables + - 默认配置与动态更新 + - 作业配置 + - 任务管理器配置 - POD augmentation via [Pod Templates]({{< ref "docs/custom-resource/pod-template" >}}) - - Native Kubernetes POD definitions - - Layering (Base/JobManager/TaskManager overrides) + - 原生Kubernetes POD定义 + - 用于自定义容器和资源 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}}) - - Collect lag and utilization metrics - - Scale job vertices to the ideal parallelism - - Scale up and down as the load changes -### Operations + - 收集延迟和利用率指标 + - 将作业顶点调整到合适的并行度 + - 根据负载的变化进行扩展和缩减 + + + +### 运营 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}}) - - Utilizes the well-established [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) - - Pluggable metrics reporters - - Detailed resources and kubernetes api access metrics + - 使用成熟的 [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) + - 可插拔的指标报告器 + - 详细的资源和 kubernetes api 访问指标 - Fully-customizable [Logging]({{< ref "docs/operations/metrics-logging#logging" >}}) - - Default log configuration - - Per job log configuration - - Sidecar based log forwarders -- Flink Web UI and REST Endpoint Access - - Fully supported Flink Native Kubernetes [service expose types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) - - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}}) + - 默认日志配置 + - 每个作业日志配置 + - 基于 sidecar 的日志转发器 +- Flink Web UI 和 REST 端点访问 + - 完整支持 Flink 原生 Kubernetes [服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) + - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}})
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584453151 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 + +Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。 + + + + +## 特征 + + + +### 核心 +- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) + - 运行、暂停和删除应用程序 + - 有状态和无状态应用程序升级 + - 保存点的触发和管理 + - 处理错误,回滚失败的升级 + - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - - Application cluster - - Session cluster - - Session job + - 应用程序集群 + - 会话集群 + - 会话作业 - Built-in [High Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) - Extensible framework - [Custom validators]({{< ref "docs/operations/plugins#custom-flink-resource-validators" >}}) - [Custom resource listeners]({{< ref "docs/operations/plugins#custom-flink-resource-listeners" >}}) - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) management - - Default configurations with dynamic updates - - Per job configuration - - Environment variables + - 默认配置与动态更新 + - 作业配置 + - 任务管理器配置 - POD augmentation via [Pod Templates]({{< ref "docs/custom-resource/pod-template" >}}) - - Native Kubernetes POD definitions - - Layering (Base/JobManager/TaskManager overrides) + - 原生Kubernetes POD定义 + - 用于自定义容器和资源 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}}) - - Collect lag and utilization metrics - - Scale job vertices to the ideal parallelism - - Scale up and down as the load changes -### Operations + - 收集延迟和利用率指标 + - 将作业顶点调整到合适的并行度 + - 根据负载的变化进行扩展和缩减 + + + +### 运营 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}}) - - Utilizes the well-established [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) - - Pluggable metrics reporters - - Detailed resources and kubernetes api access metrics + - 使用成熟的 [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics) + - 可插拔的指标报告器 + - 详细的资源和 kubernetes api 访问指标 - Fully-customizable [Logging]({{< ref "docs/operations/metrics-logging#logging" >}}) - - Default log configuration - - Per job log configuration - - Sidecar based log forwarders -- Flink Web UI and REST Endpoint Access - - Fully supported Flink Native Kubernetes [service expose types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) - - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}}) + - 默认日志配置 + - 每个作业日志配置 + - 基于 sidecar 的日志转发器 +- Flink Web UI 和 REST 端点访问 + - 完整支持 Flink 原生 Kubernetes [服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui) + - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}})
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584451630 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 + +Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。 + + + + +## 特征 + + + +### 核心 +- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) + - 运行、暂停和删除应用程序 + - 有状态和无状态应用程序升级 + - 保存点的触发和管理 + - 处理错误,回滚失败的升级 + - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - - Application cluster - - Session cluster - - Session job + - 应用程序集群 + - 会话集群 + - 会话作业 - Built-in [High Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/) - Extensible framework - [Custom validators]({{< ref "docs/operations/plugins#custom-flink-resource-validators" >}}) - [Custom resource listeners]({{< ref "docs/operations/plugins#custom-flink-resource-listeners" >}}) - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) management - - Default configurations with dynamic updates - - Per job configuration - - Environment variables + - 默认配置与动态更新 + - 作业配置 + - 任务管理器配置 Review Comment: done ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584445284 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 + +Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。 Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]
ljz2051 commented on code in PR #24739: URL: https://github.com/apache/flink/pull/24739#discussion_r158208 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java: ## @@ -95,4 +100,32 @@ public V deserializeValue(byte[] valueBytes) throws IOException { inputView.setBuffer(valueBytes); return getValueSerializer().deserialize(inputView); } + +@SuppressWarnings("unchecked") +@Override +public ForStDBGetRequest, V> buildDBGetRequest( +StateRequest stateRequest) { +Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.VALUE_GET); +ContextKey contextKey = +new ContextKey<>((RecordContext) stateRequest.getRecordContext()); +return ForStDBGetRequest.of( +contextKey, this, (InternalStateFuture) stateRequest.getFuture()); +} + +@SuppressWarnings("unchecked") +@Override +public ForStDBPutRequest, V> buildDBPutRequest( +StateRequest stateRequest) { +Preconditions.checkArgument( +stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE +|| stateRequest.getRequestType() == StateRequestType.CLEAR); +ContextKey contextKey = +new ContextKey<>((RecordContext) stateRequest.getRecordContext()); +V value = +(stateRequest.getRequestType() == StateRequestType.CLEAR) +? null // "Delete(key)" is equivalent to "Put(key, null)" +: (V) stateRequest.getPayload(); Review Comment: The `payload` in `StateRequest` represents the input of this request. The payload content for VALUE_PUT stateRequest is the `value` (see `InternalValueState#asyncUpdate`). Sorry for the misleading. In pr-24681, the cached serialized key in `RecordContext` has been renamed to `extra`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]
caicancai commented on code in PR #810: URL: https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1584443822 ## docs/content.zh/docs/concepts/overview.md: ## @@ -24,78 +24,96 @@ specific language governing permissions and limitations under the License. --> -# Overview -Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. Although Flink’s native Kubernetes integration already allows you to directly deploy Flink applications on a running Kubernetes(k8s) cluster, [custom resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) and the [operator pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have also become central to a Kubernetes native deployment experience. - -Flink Kubernetes Operator aims to capture the responsibilities of a human operator who is managing Flink deployments. Human operators have deep knowledge of how Flink deployments ought to behave, how to start clusters, how to deploy jobs, how to upgrade them and how to react if there are problems. The main goal of the operator is the automation of these activities, which cannot be achieved through the Flink native integration alone. - -## Features -### Core -- Fully-automated [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) - - Running, suspending and deleting applications - - Stateful and stateless application upgrades - - Triggering and managing savepoints - - Handling errors, rolling-back broken upgrades -- Multiple Flink version support: v1.15, v1.16, v1.17, v1.18 + + +# 概述 +Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 [自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) 和 [operator 模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 也已成为 Kubernetes 本地部署体验的核心。 + +Flink Kubernetes Operator 旨在承担管理 Flink deployments,这是 human operator 的职责。Human Operators 对 Flink 部署应该如何运行、如何启动集群、如何部署作业、如何升级作业以及出现问题时如何反应有着深入的了解。Operator 的主要目标是使这些活动自动化,而这无法仅通过 Flink 原生集成来实现。 + + + + +## 特征 + + + +### 核心 +- 全自动 [Job Lifecycle Management]({{< ref "docs/custom-resource/job-management" >}}) + - 运行、暂停和删除应用程序 + - 有状态和无状态应用程序升级 + - 保存点的触发和管理 + - 处理错误,回滚失败的升级 + - 多 Flink 版本支持:v1.15, v1.16, v1.17, v1.18 - [Deployment Modes]({{< ref "docs/custom-resource/overview#application-deployments" >}}): - - Application cluster - - Session cluster - - Session job + - 应用程序集群 + - 会话集群 + - 会话作业 Review Comment: - Application 集群 - Session 集群 - Sessio job ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:26 AM: The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] ENV glibc-version: 2.31 jemalloc-version: 4.5 was (Author: stupid_pig): The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] ENV glibc-version: 2.31 jemalloc-version: 4.5 > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 >
[jira] [Comment Edited] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:24 AM: The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] ENV glibc-version: 2.31 jemalloc-version: 4.5 was (Author: stupid_pig): The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions:
[jira] [Comment Edited] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:19 AM: The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1395,height=802! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1302,height=464! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png|width=1448,height=314! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1243,height=489! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] was (Author: stupid_pig): The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1583,height=623! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator
Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]
ljz2051 commented on code in PR #24739: URL: https://github.com/apache/flink/pull/24739#discussion_r1584431128 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateRequestClassifier.java: ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.asyncprocessing.StateRequest; +import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; +import org.apache.flink.runtime.asyncprocessing.StateRequestType; + +import java.util.ArrayList; +import java.util.List; + +/** + * The ForSt {@link StateRequestContainer} which can classify the state requests by ForStDB + * requestType (Get、Put or Iterator). + */ +public class ForStStateRequestClassifier implements StateRequestContainer { + +private final List> dbGetRequests; + +private final List> dbPutRequests; + +public ForStStateRequestClassifier() { +this.dbGetRequests = new ArrayList<>(); +this.dbPutRequests = new ArrayList<>(); +} + +@Override +public void offer(StateRequest stateRequest) { +if (stateRequest.getState() == null) { Review Comment: The state could be null only for {@link StateRequestType#SYNC_POINT}. The ForStateBackend does nothing for {@link StateRequestType#SYNC_POINT}. I have added some comments about this if-branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi edited comment on FLINK-35192 at 4/30/24 9:17 AM: The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an issue in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1583,height=623! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] was (Author: stupid_pig): The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1583,height=623! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 >
Re: [PR] [FLINK-35161][state] Implement StateExecutor for ForStStateBackend [flink]
ljz2051 commented on code in PR #24739: URL: https://github.com/apache/flink/pull/24739#discussion_r1584427802 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.asyncprocessing.StateExecutor; +import org.apache.flink.runtime.asyncprocessing.StateRequest; +import org.apache.flink.runtime.asyncprocessing.StateRequestContainer; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * The {@link StateExecutor} implementation which executing batch {@link StateRequest}s for + * ForStStateBackend. + */ +public class ForStStateExecutor implements StateExecutor { + +private static final Logger LOG = LoggerFactory.getLogger(ForStStateExecutor.class); + +/** + * The coordinator thread which schedules the execution of multiple batches of stateRequests. + * The number of coordinator threads is 1 to ensure that multiple batches of stateRequests can + * be executed sequentially. + */ +private final Executor coordinatorThread; Review Comment: I have added a method `StateExecutor#shutdown` in which the workerThreads and the coordinatorThread will be close. And whoever creates the `StateExecutor` has the responsibility to shutdown it when the task dispose. I think that role should be `AsyncKeyedStateBackend` or `AsyncExecutionController`. The `AsyncKeyedStateBackend` has not been implemented yet. I think maybe the `ForStStateExecutor` should be shutdown in the `AsyncKeyedStateBackend#dispose`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842312#comment-17842312 ] chenyuzhi commented on FLINK-35192: --- The op image we use is to modify the basic image of the DockerFile file from eclipse-temurin:11-jre-jammy to maven:3.8.4-eclipse-temurin-11 (convenient to use jdk tools). There is no modification at the code level [~bgeng777] Recently, we using pmap cmd observed the memory growth of operator and made a comparison and checking the process rss monitoring of the op host. The comparison time was 04.24 20:00-04.25 01:20. The process rss occupation increased by 95 MB(34.725G-34.630G): !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/7b7c4c18e6ad44baa9cebec9c3ee4dae.png|width=1607,height=924! !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/918f54f737d8430bab99f8a8536d287b.png|width=1743,height=621! Observe the pmap results at these two time points and compare: !https://office.netease.com/api/admin/file/download?path=cowork/2024/04/25/38d9386430814e8282683930ab72daea.png! It can be found that the growth mainly comes from the front memory area, and there are many 65536KB memory blocks. At the same time, the front memory area increased from 74 MB (35444 KB). + 39668 KB) grows to 165 MB (65536 KB + 65536 KB + 35708 KB), the increaing result 91MB(165MB - 74MB)basically matching the memory growth of the process. Referring to an [issue|https://issues.apache.org/jira/browse/HADOOP-7154] in hadoop, it is suspected that glibc has a memory leak due to memory fragmentation in multiple arenas under multi-threading. At present, we refer to the [DockerFile|https://github.com/apache/flink-docker/blob/master/1.18/scala_2.12-java11-ubuntu/Dockerfile#L24] of flink-docker, which using jemalloc as memory assigner. We replacing the glibc used by the operator with jemalloc4.5, and adjust the Xmx of the operator from 32g to 5.25g. Here is the memory usage of the process after adjustment: !image-2024-04-30-17-11-24-974.png|width=1583,height=623! Therefore, it is suspected that it is caused by a memory leak in glibc in a multi-threaded environment. Does the community consider changing the memory allocation of the op from glibc to jemalloc? If so, I will be happy to provide a PR to optimize this issue. [~gyfora] > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, > image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, > screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenyuzhi updated FLINK-35192: -- Attachment: image-2024-04-30-17-11-24-974.png > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, > image-2024-04-30-17-11-24-974.png, screenshot-1.png, screenshot-2.png, > screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34902) INSERT INTO column mismatch leads to IndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-34902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-34902. Resolution: Fixed Fixed in master: 8a75f8f22b8e08e45f1a6453b87718eab6db115a > INSERT INTO column mismatch leads to IndexOutOfBoundsException > -- > > Key: FLINK-34902 > URL: https://issues.apache.org/jira/browse/FLINK-34902 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Jeyhun Karimov >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > SQL: > {code} > INSERT INTO t (a, b) SELECT 1; > {code} > > Stack trace: > {code} > org.apache.flink.table.api.ValidationException: SQL validation failed. Index > 1 out of bounds for length 1 > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) > at > Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for > length 1 > at > java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) > at > java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) > at > java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) > at java.base/java.util.Objects.checkIndex(Objects.java:374) > at java.base/java.util.ArrayList.get(ArrayList.java:459) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1(PreValidateReWriter.scala:355) > at > org.apache.flink.table.planner.calcite.PreValidateReWriter$.$anonfun$reorder$1$adapted(PreValidateReWriter.scala:355) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES [flink]
twalthr merged PR #24724: URL: https://github.com/apache/flink/pull/24724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]
ljz2051 commented on code in PR #24640: URL: https://github.com/apache/flink/pull/24640#discussion_r1584382431 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) { @Override public void close() throws IOException {} +// +// restore +// + +@Override +public void restoreStateHandles( +long checkpointId, SubtaskKey subtaskKey, Stream stateHandles) { + +Set uploadedLogicalFiles; +synchronized (lock) { +uploadedLogicalFiles = +uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>()); +} + +stateHandles.forEach( +fileHandle -> { +PhysicalFile physicalFile = +new PhysicalFile( +null, +fileHandle.getFilePath(), +physicalFileDeleter, Review Comment: As discussed offline with @Zakelly , we can distinguish whether the restored filePath is under the managedDir scope (`managedSharedStateDir` or `managedExclusiveStateDir`), to determine whether the file should be deleted by the `FileMergingSnapshotManager`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chenyuzhi updated FLINK-35192: -- Attachment: image-2024-04-30-16-47-07-289.png > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, image-2024-04-30-16-47-07-289.png, > screenshot-1.png, screenshot-2.png, screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]
ljz2051 commented on code in PR #24640: URL: https://github.com/apache/flink/pull/24640#discussion_r1584375937 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) { @Override public void close() throws IOException {} +// +// restore +// + +@Override +public void restoreStateHandles( +long checkpointId, SubtaskKey subtaskKey, Stream stateHandles) { + +Set uploadedLogicalFiles; +synchronized (lock) { +uploadedLogicalFiles = +uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>()); +} + +stateHandles.forEach( +fileHandle -> { +PhysicalFile physicalFile = +new PhysicalFile( +null, +fileHandle.getFilePath(), +physicalFileDeleter, Review Comment: @fredia In the case of the job restore (not failover and not rescaling), the jobID may be changed, and I think the JM should claim ownership of these files to delete them. In this case, however, the parallelism of the job has not changed, so the `keyGroupRange` of the `KeyedStateHandle` cannot be used as judgment conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842294#comment-17842294 ] Biao Geng commented on FLINK-35192: --- Sure, created a [pr|https://github.com/apache/flink-kubernetes-operator/pull/822] for this. > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, screenshot-1.png, screenshot-2.png, > screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]
ljz2051 commented on code in PR #24640: URL: https://github.com/apache/flink/pull/24640#discussion_r1584364578 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java: ## @@ -830,6 +832,8 @@ public CompletableFuture submitTask( channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId)); taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured); +registerTaskRestoreInfoToFileMergingManager( +fileMergingSnapshotManager, task.getTaskInfo(), taskRestore); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35212) PyFlink thread mode process just can run once in standalonesession mode
[ https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842295#comment-17842295 ] Wei Yuan commented on FLINK-35212: -- Hi Biao Geng, as you say, maybe the exception which I submitted caused by I modified same files under python site-packages of pyflink when I try to solve FLINK-35180. And I met the same exception in commit #0 with Java8 and Python 3.8.6 as you pasted . Looking forward for your improvement for this. > PyFlink thread mode process just can run once in standalonesession mode > --- > > Key: FLINK-35212 > URL: https://issues.apache.org/jira/browse/FLINK-35212 > Project: Flink > Issue Type: Bug > Components: API / Python > Environment: Python 3.10.14 > PyFlink==1.18.1 > openjdk version "11.0.21" 2023-10-17 LTS > OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build > 11.0.21+9-LTS) > OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, > mixed mode, sharing) >Reporter: Wei Yuan >Priority: Critical > > {code:java} > from pyflink.common.types import Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types, WatermarkStrategy, Configuration > from pyflink.table import EnvironmentSettings, TableEnvironment > from pyflink.table import StreamTableEnvironment, Schema > from pyflink.datastream.functions import ProcessFunction, MapFunction > from pyflink.common.time import Instant > # init task env > config = Configuration() > config.set_string("python.execution-mode", "thread") > # config.set_string("python.execution-mode", "process") > config.set_string("python.client.executable", "/root/miniconda3/bin/python3") > config.set_string("python.executable", "/root/miniconda3/bin/python3") > env = StreamExecutionEnvironment.get_execution_environment(config) > table_env = StreamTableEnvironment.create(env) > # create a batch TableEnvironment > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", > "content") > table_env.create_temporary_view("test_table", table) > result_table = table_env.sql_query("select *, NOW() as dt from test_table") > result_ds = table_env.to_data_stream(result_table) > # def test_func(row): > # return row > # result_ds.map(test_func).print() > result_ds.print() > env.execute() > {code} > Start a standalone session mode cluster by command: > {code:java} > /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code} > Submit thread mode job for the first time, this job will success fnished. > {code:java} > /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py > bug.py {code} > Use above command to submit job for the second time, an error occured: > {code:java} > Job has been submitted with JobID a4f2728199277bba0500796f7925fa26 > Traceback (most recent call last): > File "/home/disk1/bug.py", line 34, in > env.execute() > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", > line 773, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > return_value = get_return_value( > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line > 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: a4f2728199277bba0500796f7925fa26) > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) > at > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at >
Re: [PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]
flinkbot commented on PR #24750: URL: https://github.com/apache/flink/pull/24750#issuecomment-2084709081 ## CI report: * 9970141f26ce36804d528727728eb23648a1201f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager [flink]
ljz2051 commented on code in PR #24640: URL: https://github.com/apache/flink/pull/24640#discussion_r1584361582 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -575,8 +576,51 @@ private void createManagedDirectory(Path managedPath) { @Override public void close() throws IOException {} +// +// restore +// + +@Override +public void restoreStateHandles( +long checkpointId, SubtaskKey subtaskKey, Stream stateHandles) { + +Set uploadedLogicalFiles; +synchronized (lock) { +uploadedLogicalFiles = +uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>()); +} + +stateHandles.forEach( +fileHandle -> { +PhysicalFile physicalFile = +new PhysicalFile( +null, +fileHandle.getFilePath(), +physicalFileDeleter, +fileHandle.getScope()); Review Comment: Yes, I have refactor this method. Please review FileMergingSnapshotManagerBase#restoreStateHandles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35195) Support the execution of create materialized table in continuous refresh mode
[ https://issues.apache.org/jira/browse/FLINK-35195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35195: --- Labels: pull-request-available (was: ) > Support the execution of create materialized table in continuous refresh mode > - > > Key: FLINK-35195 > URL: https://issues.apache.org/jira/browse/FLINK-35195 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Gateway >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > In continuous refresh mode, support creates materialized table and its > background refresh job: > {code:SQL} > CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name > > [ ([ ]) ] > > [COMMENT table_comment] > > [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] > > [WITH (key1=val1, key2=val2, ...)] > > FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY } > > [REFRESH_MODE = { CONTINUOUS | FULL }] > > AS > > : > [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35195][table] Support the execution of create materialized table in continuous refresh mode [flink]
lsyldliu opened a new pull request, #24750: URL: https://github.com/apache/flink/pull/24750 ## What is the purpose of the change *Support the execution of create materialized table in continuous refresh mode* ## Brief change log - *Support the execution of create materialized table in continuous refresh mode* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests in MaterializedTableStatementITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842292#comment-17842292 ] Ryan Skraba commented on FLINK-35041: - 1.20 test_ci_core https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59281=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8885 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Feifan Wang >Priority: Blocker > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35192) operator oom
[ https://issues.apache.org/jira/browse/FLINK-35192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35192: --- Labels: pull-request-available (was: ) > operator oom > > > Key: FLINK-35192 > URL: https://issues.apache.org/jira/browse/FLINK-35192 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.1 > Environment: jdk: openjdk11 > operator version: 1.6.1 >Reporter: chenyuzhi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-22-15-47-49-455.png, > image-2024-04-22-15-52-51-600.png, image-2024-04-22-15-58-23-269.png, > image-2024-04-22-15-58-42-850.png, screenshot-1.png, screenshot-2.png, > screenshot-3.png > > > The kubernetest operator docker process was killed by kernel cause out of > memory(the time is 2024.04.03: 18:16) > !image-2024-04-22-15-47-49-455.png! > Metrics: > the pod memory (RSS) is increasing slowly in the past 7 days: > !screenshot-1.png! > However the jvm memory metrics of operator not shown obvious anomaly: > !image-2024-04-22-15-58-23-269.png! > !image-2024-04-22-15-58-42-850.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35192] Remove usage of deleteOnExit() to reduce the memory usage of the operator [flink-kubernetes-operator]
bgeng777 opened a new pull request, #822: URL: https://github.com/apache/flink-kubernetes-operator/pull/822 ## What is the purpose of the change According to [FLINK-35192](https://issues.apache.org/jira/browse/FLINK-35192), the usage of file.deleteOnExit() will register a hashmap entry in the JVM which is cleaned only when the JVM shuts down. As a result, a lot of memory would be used when the operator is running for a long time with plenty of submissions. As we have guaranteed that those temporary config files would be cleaned up in FlinkConfigManager's cache mechanism, we can safely remove the usage of deleteOnExit(). ## Brief change log - Remove the usage of deleteOnExit() in FlinkConfigBuilder ## Verifying this change *(Please pick either of the following options)* This change is already covered by existing tests, such as org.apache.flink.kubernetes.operator.config.FlinkConfigManagerTest#testConfUpdateAndCleanup. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / **no**) - Core observer or reconciler logic that is regularly executed: (yes / **no**) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34071) Deadlock in AWS Kinesis Data Streams AsyncSink connector
[ https://issues.apache.org/jira/browse/FLINK-34071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842275#comment-17842275 ] Aleksandr Pilipenko commented on FLINK-34071: - [~chalixar] for exception classification I following up by increasing visibility (FLINK-35269). For malicious records - will check additionally to see which exceptions to add as Fatal. In terms of timeout - while nice to have, I am not sure it is related to this issue at all. > Deadlock in AWS Kinesis Data Streams AsyncSink connector > > > Key: FLINK-34071 > URL: https://issues.apache.org/jira/browse/FLINK-34071 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Kinesis >Affects Versions: aws-connector-3.0.0, 1.15.4, aws-connector-4.2.0 >Reporter: Aleksandr Pilipenko >Priority: Major > > Sink operator hangs while flushing records, similarly to FLINK-32230. Error > observed even when using AWS SDK version that contains fix for async client > error handling [https://github.com/aws/aws-sdk-java-v2/pull/4402] > Thread dump of stuck thread: > {code:java} > "sdk-async-response-1-6236" Id=11213 RUNNABLE > at > app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$flush$5(AsyncSinkWriter.java:385) > at > app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter$$Lambda$1778/0x000801141040.accept(Unknown > Source) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.handleFullyFailedRequest(KinesisStreamsSinkWriter.java:210) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.lambda$submitRequestEntries$1(KinesisStreamsSinkWriter.java:184) > at > org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter$$Lambda$1965/0x0008011a0c40.accept(Unknown > Source) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x000801181840.accept(Unknown > Source) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda$1961/0x000801191c40.accept(Unknown > Source) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda$1960/0x000801191840.accept(Unknown > Source) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) > at >