Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
ramkrish86 commented on code in PR #24896: URL: https://github.com/apache/flink/pull/24896#discussion_r1634237585 ## flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java: ## @@ -59,7 +59,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public void flush() throws IOException { -out.hsync(); +out.hflush(); Review Comment: I checked this change. In ABFS case hflush and hsync internally does the same I believe. So that functionality might not break. In HDFS case yes this might make things costly. Thanks for fixing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
cxzl25 commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159856148 > The commit message is not yet update Sorry, it's updated now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2159830832 > hi @snuyanzin @superdiaodiao do we need supports encoding args ? > db2 https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode > max compute https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode > Calcite, Spark also need only one arg, it is enough to handle cases and UTF-8 can support all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26951) Add HASH supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-26951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853901#comment-17853901 ] Kartikey Pant commented on FLINK-26951: --- Hey [~lincoln.86xy], can you please assign this issue to me? > Add HASH supported in SQL & Table API > - > > Key: FLINK-26951 > URL: https://issues.apache.org/jira/browse/FLINK-26951 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: dalongliu >Priority: Major > Fix For: 1.20.0 > > > Returns a hash value of the arguments. > Syntax: > {code:java} > hash(expr1, ...) {code} > Arguments: > * {{{}exprN{}}}: An expression of any type. > Returns: > An INTEGER. > Examples: > {code:java} > > SELECT hash('Flink', array(123), 2); > -1321691492 {code} > See more: > * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#hash] > * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853900#comment-17853900 ] Kartikey Pant commented on FLINK-12450: --- Hey [~taoran]/[~jark], could you please confirm if this issue is still open for contribution? If so, can you please assign this ticket to me? > [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table > API and SQL > --- > > Key: FLINK-12450 > URL: https://issues.apache.org/jira/browse/FLINK-12450 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanchun Zhang >Assignee: Ran Tao >Priority: Major > Labels: auto-unassigned, stale-assigned > > BIT_LSHIFT, Shifts a long number to the left > BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
xintongsong commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159816793 > Thanks, I've updated. The commit message is not yet update. Have you forgot to push the changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
liuyongvs commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2159808362 hi @snuyanzin @superdiaodiao do we need supports encoding args ? db2 https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode max compute https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2159778519 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634155084 ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +71,38 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException +| CertificateException +| NoSuchAlgorithmException +| KeyStoreException e) { +throw new RemoteTransportException( Review Comment: I simplified it a bit to catch GeneralSecurityException, but I think that's the most we can do. The Scala base class doesn't throw any exceptions, so we have to catch it in the Java override. ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: updated documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634154163 ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) { .isInstanceOf(Exception.class); } +/** Tests that REST Client SSL Engine creation fails with bad SSL configuration. */ +@ParameterizedTest +@MethodSource("parameters") +void testRESTClientSSLBadTruststoreType(String sslProvider) { +Configuration clientConfig = createRestSslConfigWithTrustStore(sslProvider); +clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS"); Review Comment: Changed the tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634153986 ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) { .isInstanceOf(Exception.class); } +@ParameterizedTest +@MethodSource("parameters") +void testInternalSSLWrongKeystoreType(String sslProvider) { +final Configuration config = createInternalSslConfigWithKeyAndTrustStores(sslProvider); +config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS"); + +assertThatThrownBy(() -> SSLUtils.createInternalServerSSLEngineFactory(config)) +.isInstanceOf(java.io.IOException.class); Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails
[ https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853894#comment-17853894 ] Weijie Guo commented on FLINK-18476: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60177=logs=298e20ef-7951-5965-0e79-ea664ddc435e=d4c90338-c843-57b0-3232-10ae74f00347=23446 > PythonEnvUtilsTest#testStartPythonProcess fails > --- > > Key: FLINK-18476 > URL: https://issues.apache.org/jira/browse/FLINK-18476 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > test-stability > > The > {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} > failed in my local environment as it assumes the environment has > {{/usr/bin/python}}. > I don't know exactly how did I get python in Ubuntu 20.04, but I have only > alias for {{python = python3}}. Therefore the tests fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34914) FLIP-436: Introduce Catalog-related Syntax
[ https://issues.apache.org/jira/browse/FLINK-34914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853893#comment-17853893 ] Weijie Guo commented on FLINK-34914: Hi [~liyubin117] and [~qingyue]. FYI: We expect the feature freeze time to be June 15. So there is a great risk of this feature to merge into 1.20. I noticed from the 1.20 release wiki page that only half of the work is done, would you be willing to move it to the next release If you don't have enough time? > FLIP-436: Introduce Catalog-related Syntax > -- > > Key: FLINK-34914 > URL: https://issues.apache.org/jira/browse/FLINK-34914 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > > Umbrella issue for: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]
yuxiqian commented on code in PR #3398: URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1634142791 ## flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh: ## @@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then exit 1 fi -# Setup Flink related configurations -# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it -_FLINK_HOME_DETERMINED=1 Review Comment: Should `_FLINK_HOME_DETERMINED` env var be preserved here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35563) 'Run kubernetes application test' failed on AZP
[ https://issues.apache.org/jira/browse/FLINK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35563: --- Description: {code:java} Jun 08 01:52:28 deployment.apps "flink-native-k8s-application-1" deleted Jun 08 01:52:29 clusterrolebinding.rbac.authorization.k8s.io "flink-role-binding-default" deleted Jun 08 01:52:35 pod/flink-native-k8s-application-1-7c58596d75-blw5k condition met Jun 08 01:52:35 Stopping minikube ... Jun 08 01:52:35 * Stopping node "minikube" ... Jun 08 01:52:40 * 1 node stopped. Jun 08 01:52:40 [FAIL] Test script contains errors. Jun 08 01:52:40 Checking for errors... Jun 08 01:52:40 No errors in log files. Jun 08 01:52:40 Checking for exceptions... Jun 08 01:52:40 No exceptions in log files. Jun 08 01:52:40 Checking for non-empty .out files... grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or directory Jun 08 01:52:40 No non-empty .out files. Jun 08 01:52:40 Jun 08 01:52:40 [FAIL] 'Run kubernetes application test' failed after 3 minutes and 2 seconds! Test exited with exit code 1 {code} Unfortunately, I haven't found the root cause yet (: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60162=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=5d91035e-8022-55f2-2d4f-ab121508bf7e=10046 > 'Run kubernetes application test' failed on AZP > --- > > Key: FLINK-35563 > URL: https://issues.apache.org/jira/browse/FLINK-35563 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > {code:java} > Jun 08 01:52:28 deployment.apps "flink-native-k8s-application-1" deleted > Jun 08 01:52:29 clusterrolebinding.rbac.authorization.k8s.io > "flink-role-binding-default" deleted > Jun 08 01:52:35 pod/flink-native-k8s-application-1-7c58596d75-blw5k condition > met > Jun 08 01:52:35 Stopping minikube ... > Jun 08 01:52:35 * Stopping node "minikube" ... > Jun 08 01:52:40 * 1 node stopped. > Jun 08 01:52:40 [FAIL] Test script contains errors. > Jun 08 01:52:40 Checking for errors... > Jun 08 01:52:40 No errors in log files. > Jun 08 01:52:40 Checking for exceptions... > Jun 08 01:52:40 No exceptions in log files. > Jun 08 01:52:40 Checking for non-empty .out files... > grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or > directory > Jun 08 01:52:40 No non-empty .out files. > Jun 08 01:52:40 > Jun 08 01:52:40 [FAIL] 'Run kubernetes application test' failed after 3 > minutes and 2 seconds! Test exited with exit code 1 > {code} > Unfortunately, I haven't found the root cause yet (: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60162=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=5d91035e-8022-55f2-2d4f-ab121508bf7e=10046 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35563) 'Run kubernetes application test' failed on AZP
Weijie Guo created FLINK-35563: -- Summary: 'Run kubernetes application test' failed on AZP Key: FLINK-35563 URL: https://issues.apache.org/jira/browse/FLINK-35563 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails
[ https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853888#comment-17853888 ] Weijie Guo commented on FLINK-18476: test_cron_jdk21 misc https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c301da75-e699-5c06-735f-778207c16f50=23053 > PythonEnvUtilsTest#testStartPythonProcess fails > --- > > Key: FLINK-18476 > URL: https://issues.apache.org/jira/browse/FLINK-18476 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > test-stability > > The > {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} > failed in my local environment as it assumes the environment has > {{/usr/bin/python}}. > I don't know exactly how did I get python in Ubuntu 20.04, but I have only > alias for {{python = python3}}. Therefore the tests fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds
Weijie Guo created FLINK-35562: -- Summary: WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds Key: FLINK-35562 URL: https://issues.apache.org/jira/browse/FLINK-35562 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.20.0 Reporter: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds
[ https://issues.apache.org/jira/browse/FLINK-35562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-35562: --- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=32715a4c-21b8-59a3-4171-744e5ab107eb=ff64056b-5320-5afe-c22c-6fa339e59586=11885 > WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds > - > > Key: FLINK-35562 > URL: https://issues.apache.org/jira/browse/FLINK-35562 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Major > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=32715a4c-21b8-59a3-4171-744e5ab107eb=ff64056b-5320-5afe-c22c-6fa339e59586=11885 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]
ljz2051 commented on code in PR #24873: URL: https://github.com/apache/flink/pull/24873#discussion_r1634123958 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.execution.RestoreMode; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * FileMerging Compatibility IT case which tests recovery from a checkpoint created in different + * fileMerging mode (i.e. fileMerging enabled/disabled). + */ +public class SnapshotFileMergingCompatibilityITCase extends TestLogger { + +public static Collection parameters() { +return Arrays.asList( +new Object[][] { +{RestoreMode.CLAIM, true}, +{RestoreMode.CLAIM, false}, +{RestoreMode.NO_CLAIM, true}, +{RestoreMode.NO_CLAIM, false} +}); +} + +@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}") +@MethodSource("parameters") +public void testSwitchFromDisablingToEnablingFileMerging( +RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir Path checkpointDir) +throws Exception { +testSwitchingFileMerging( +checkpointDir, false, true, restoreMode, fileMergingAcrossBoundary); +} + +@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}") +@MethodSource("parameters") +public void testSwitchFromEnablingToDisablingFileMerging( +RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir Path checkpointDir) +throws Exception { +testSwitchingFileMerging( +checkpointDir, true, false, restoreMode, fileMergingAcrossBoundary); +} + +private void testSwitchingFileMerging( +Path checkpointDir, +boolean firstFileMergingSwitch, +boolean secondFileMergingSwitch, +RestoreMode restoreMode, +boolean fileMergingAcrossBoundary) +throws Exception { +final Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toUri().toString()); +config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); +config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, fileMergingAcrossBoundary); +config.set(CheckpointingOptions.FILE_MERGING_ENABLED, firstFileMergingSwitch); +MiniClusterWithClientResource firstCluster = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setConfiguration(config) +.setNumberTaskManagers(2) +.setNumberSlotsPerTaskManager(2) +.build()); +EmbeddedRocksDBStateBackend stateBackend1 = new EmbeddedRocksDBStateBackend(); +stateBackend1.configure(config, Thread.currentThread().getContextClassLoader()); +firstCluster.before(); +String externalCheckpoint; +try { +externalCheckpoint = +runJobAndGetExternalizedCheckpoint( +stateBackend1, null, firstCluster,
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: @gyfora @1996fanrui I think the existing matching logic is not too complicated, and the PR is not too large. If the maximum parallelism of other source components is expanded in the future, it will not be too late to split it; If we want to split the matching logic of Kafka and Pulsar, we may need to perform two regular matching on the source metrics data set to obtain the number of partitions of these two queues separately, which does not seem to be necessary; It seems that there is no simple or matching. The current regular expression is or, and it is also distinguished in the map function. Maybe you have a better answer to tell me, thank you. I suggest keeping the logic of the existing PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
cxzl25 commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159691770 Gentle ping @xintongsong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634091321 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: Sorry, I will update this PR today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
gong commented on PR #3407: URL: https://github.com/apache/flink-cdc/pull/3407#issuecomment-2159661569 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
gong commented on PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2159656572 > @gong would you like to open a PR for release-3.1 branch? ok, I will open the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
reswqa commented on code in PR #24845: URL: https://github.com/apache/flink/pull/24845#discussion_r1634076649 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + * The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + +private static final long serialVersionUID = 1L; + +/** The serializer for the elements of the set. */ +private final TypeSerializer elementSerializer; + +/** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ +public SetSerializer(TypeSerializer elementSerializer) { +this.elementSerializer = checkNotNull(elementSerializer); +} + +// +// SetSerializer specific properties +// + +/** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ +public TypeSerializer getElementSerializer() { +return elementSerializer; +} + +// +// Type Serializer implementation +// + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public TypeSerializer> duplicate() { +TypeSerializer duplicateElement = elementSerializer.duplicate(); +return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); +} + +@Override +public Set createInstance() { +return new HashSet<>(0); +} + +@Override +public Set copy(Set from) { +Set newSet = new HashSet<>(from.size()); +for (T element : from) { +newSet.add(elementSerializer.copy(element)); +} +return newSet; +} + +@Override +public Set copy(Set from, Set reuse) { +return copy(from); +} + +@Override +public int getLength() { +return -1; // var length +} + +@Override +public void serialize(Set set, DataOutputView target) throws IOException { +final int size = set.size(); +target.writeInt(size); +for (T element : set) { +elementSerializer.serialize(element, target); Review Comment: > I also found that the serializing a list with null values will throw NPE as we reused the existing ListSerializer impl, which does not allow null values (it is only used for serializing ListState before where null value is explicitly forbidden in the contract). Directly adding null marker for it will break state compatibility, I plan to introduce a new list serializer that accepts null values for serializing user objects in a new JIRA, WDYT Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service,
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
reswqa commented on code in PR #24845: URL: https://github.com/apache/flink/pull/24845#discussion_r1634061063 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + * The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + +private static final long serialVersionUID = 1L; + +/** The serializer for the elements of the set. */ +private final TypeSerializer elementSerializer; + +/** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ +public SetSerializer(TypeSerializer elementSerializer) { +this.elementSerializer = checkNotNull(elementSerializer); +} + +// +// SetSerializer specific properties +// + +/** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ +public TypeSerializer getElementSerializer() { +return elementSerializer; +} + +// +// Type Serializer implementation +// + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public TypeSerializer> duplicate() { +TypeSerializer duplicateElement = elementSerializer.duplicate(); +return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); +} + +@Override +public Set createInstance() { +return new HashSet<>(0); +} + +@Override +public Set copy(Set from) { +Set newSet = new HashSet<>(from.size()); +for (T element : from) { +newSet.add(elementSerializer.copy(element)); +} +return newSet; +} + +@Override +public Set copy(Set from, Set reuse) { +return copy(from); +} + +@Override +public int getLength() { +return -1; // var length +} + +@Override +public void serialize(Set set, DataOutputView target) throws IOException { +final int size = set.size(); +target.writeInt(size); +for (T element : set) { +elementSerializer.serialize(element, target); Review Comment: > I also found that the serializing a list with null values will throw NPE as we reused the existing ListSerializer impl, which does not allow null values (it is only used for serializing ListState before where null value is explicitly forbidden in the contract). Directly adding null marker for it will break state compatibility, I plan to introduce a new list serializer that accepts null values for serializing user objects in a new JIRA, WDYT Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service,
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
1996fanrui commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634051534 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: This PR has not been updated for almost a month, and the author has not responded. The flink kubernetes operator will be released this week, so I will take over this PR if it's still not updated or responded within 24 hours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853858#comment-17853858 ] Rui Fan commented on FLINK-35489: - Merged to main(1.9.0) via: c3e94aec0c6f081a99b7467bd0bcee551b841600 > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-35489: --- Assignee: Nicolas Fraison > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-35489. - Fix Version/s: kubernetes-operator-1.9.0 Resolution: Fixed > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
1996fanrui merged PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
1996fanrui commented on PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2159610745 Thanks @ashangit for the fix and everyone for the review! Merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35557) MemoryManager only reserves memory per consumer type once
[ https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853850#comment-17853850 ] Xintong Song edited comment on FLINK-35557 at 6/11/24 1:26 AM: --- I think this is by design. As suggested by the name, the memory is expected to be shared within the slot. Take RocksDBStateBackend as an example, when there're multiple states in one slot, there will only be one cache area and one write buffer area shared by all the states. Given that states are initialized concurrently, there's no guarantee which state will be initialized first, thus we introduced shared resources to only allocate the memory when the first state is initialized. If the memory consumer does not want this sharing behavior, it can simply call allocatePages / reserveMemory instead of getSharedMemoryResourceForManagedMemory. was (Author: xintongsong): I think this is by design. As suggested by the name, the memory is expected to be shared within the slot. Take RocksDBStateBackend as an example, when there're multiple states in one slot, there will only be one cache area and one write buffer area share by all the states. Given that states are initialized concurrently, there's no guarantee which state will be initialized first, thus we introduced shared resources to only allocate the memory when the first state is initialized. If the memory consumer does not want this sharing behaviors, it can simply call allocatePages / reserveMemory instead of getSharedMemoryResourceForManagedMemory. > MemoryManager only reserves memory per consumer type once > - > > Key: FLINK-35557 > URL: https://issues.apache.org/jira/browse/FLINK-35557 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Runtime / Task >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Fix For: 1.20.0, 1.19.2 > > > # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we > [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] > a reserve function > # The function > [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] > the available Slot memory and fails if there's not enough memory > # We pass it to {{SharedResources.getOrAllocateSharedResource}} > # In {{SharedResources.getOrAllocateSharedResource}} , we check if the > resource (memory) was already reserved by some key (e.g. > {{{}state-rocks-managed-memory{}}}) > # If not, we create a new one and call the reserve function > # If the resource was already reserved (not null), we do NOT reserve the > memory again: > [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] > So there will be only one (first) memory reservation for rocksdb for example, > no matter how many state backends in a slot are created. Meaning that managed > memory limits are not applied -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35557) MemoryManager only reserves memory per consumer type once
[ https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853850#comment-17853850 ] Xintong Song commented on FLINK-35557: -- I think this is by design. As suggested by the name, the memory is expected to be shared within the slot. Take RocksDBStateBackend as an example, when there're multiple states in one slot, there will only be one cache area and one write buffer area share by all the states. Given that states are initialized concurrently, there's no guarantee which state will be initialized first, thus we introduced shared resources to only allocate the memory when the first state is initialized. If the memory consumer does not want this sharing behaviors, it can simply call allocatePages / reserveMemory instead of getSharedMemoryResourceForManagedMemory. > MemoryManager only reserves memory per consumer type once > - > > Key: FLINK-35557 > URL: https://issues.apache.org/jira/browse/FLINK-35557 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Runtime / Task >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Fix For: 1.20.0, 1.19.2 > > > # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we > [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] > a reserve function > # The function > [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] > the available Slot memory and fails if there's not enough memory > # We pass it to {{SharedResources.getOrAllocateSharedResource}} > # In {{SharedResources.getOrAllocateSharedResource}} , we check if the > resource (memory) was already reserved by some key (e.g. > {{{}state-rocks-managed-memory{}}}) > # If not, we create a new one and call the reserve function > # If the resource was already reserved (not null), we do NOT reserve the > memory again: > [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] > So there will be only one (first) memory reservation for rocksdb for example, > no matter how many state backends in a slot are created. Meaning that managed > memory limits are not applied -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634004521 ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: I found an `OverrideDefault` annotation, will try that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1633910473 ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: You're right. Since I generated the documentation with Java 11, it output PKCS12. Do you know how I can stop the default value in the documentation from being generated and instead add it manually? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
morazow commented on PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2159296755 Thanks @dongwoo6kim , Tests looks good from my side (_Recently I faced similar issue which maybe related, when running batch mode with setting `startingOffsets`. The change should solve that issue. But we may create issue for it_) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2159216321 I moved the example application under tests and removed the separate module completely. I added links to the example application in the README. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1633781977 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. Review Comment: I moved the example application under tests, as suggested, merging everything in a single class. The usage of the AMP connector is commented out, being a separate dependency, but still present to show the usage, in case someone wants to use it. https://github.com/apache/flink-connector-prometheus/blob/5afb890aa8fe860ce25bcf89f7cdee93a6d2c4be/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors
[ https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853754#comment-17853754 ] Robert Metzger commented on FLINK-30400: The problem of marking flink-connector-base provided for a connector is that classes like the {{DeliveryGuarantee}} are missing when developing locally. For example the KafkaSink example includes the {{DeliveryGuarantee}}, which is not pulled in by my IDE because it a transitive provided dependency. We could also update the connector docs and list the {{flink-connector-base}} as well? > Stop bundling connector-base in externalized connectors > --- > > Key: FLINK-30400 > URL: https://issues.apache.org/jira/browse/FLINK-30400 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Common >Reporter: Chesnay Schepler >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available > Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, > rabbitmq-3.1.0, kafka-3.0.2 > > > Check that none of the externalized connectors bundle connector-base; if so > remove the bundling and schedule a new minor release. > Bundling this module is highly problematic w.r.t. binary compatibility, since > bundled classes may rely on internal APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35559) Shading issue cause class conflict
[ https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksandr Pilipenko updated FLINK-35559: Priority: Blocker (was: Major) > Shading issue cause class conflict > -- > > Key: FLINK-35559 > URL: https://issues.apache.org/jira/browse/FLINK-35559 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Firehose, Connectors / > Kinesis >Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: pull-request-available > Fix For: aws-connector-4.4.0 > > Attachments: Screenshot 2024-06-08 at 18.19.30.png > > > Incorrect shading configuration causes ClassCastException during exception > handling when job package flink-connector-kinesis with > flink-connector-aws-kinesis-firehose. > {code:java} > java.lang.ClassCastException: class > software.amazon.awssdk.services.firehose.model.FirehoseException cannot be > cast to class > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > (software.amazon.awssdk.services.firehose.model.FirehoseException and > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > are in unnamed module of loader 'app') > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79) > ~[utils-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) > ~[sdk-core-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at >
[jira] [Commented] (FLINK-35445) Update Async Sink documentation for Timeout configuration
[ https://issues.apache.org/jira/browse/FLINK-35445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853734#comment-17853734 ] Ahmed Hamdy commented on FLINK-35445: - As discussed offline, there is no docs for Asyncsink, hence will close as not an issue > Update Async Sink documentation for Timeout configuration > -- > > Key: FLINK-35445 > URL: https://issues.apache.org/jira/browse/FLINK-35445 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Documentation >Reporter: Ahmed Hamdy >Priority: Major > Fix For: 1.20.0 > > > Update Documentation for AsyncSink Changes introduced by > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35445) Update Async Sink documentation for Timeout configuration
[ https://issues.apache.org/jira/browse/FLINK-35445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy closed FLINK-35445. --- Resolution: Not A Problem > Update Async Sink documentation for Timeout configuration > -- > > Key: FLINK-35445 > URL: https://issues.apache.org/jira/browse/FLINK-35445 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common, Documentation >Reporter: Ahmed Hamdy >Priority: Major > Fix For: 1.20.0 > > > Update Documentation for AsyncSink Changes introduced by > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853733#comment-17853733 ] Sergey Nuyanzin commented on FLINK-35140: - Need for PMC help as mentioned at https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g > Release flink-connector-opensearch vX.X.X for Flink 1.19 > > > Key: FLINK-35140 > URL: https://issues.apache.org/jira/browse/FLINK-35140 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Opensearch >Reporter: Danny Cranmer >Assignee: Sergey Nuyanzin >Priority: Major > > [https://github.com/apache/flink-connector-opensearch] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853733#comment-17853733 ] Sergey Nuyanzin edited comment on FLINK-35140 at 6/10/24 4:43 PM: -- Almost done: voting passed for both v1 and v2 Now need for PMC help as mentioned at [https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g] was (Author: sergey nuyanzin): Need for PMC help as mentioned at https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g > Release flink-connector-opensearch vX.X.X for Flink 1.19 > > > Key: FLINK-35140 > URL: https://issues.apache.org/jira/browse/FLINK-35140 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Opensearch >Reporter: Danny Cranmer >Assignee: Sergey Nuyanzin >Priority: Major > > [https://github.com/apache/flink-connector-opensearch] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-35140: --- Assignee: Sergey Nuyanzin (was: Sergey Nuyanzin) > Release flink-connector-opensearch vX.X.X for Flink 1.19 > > > Key: FLINK-35140 > URL: https://issues.apache.org/jira/browse/FLINK-35140 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Opensearch >Reporter: Danny Cranmer >Assignee: Sergey Nuyanzin >Priority: Major > > [https://github.com/apache/flink-connector-opensearch] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
z3d1k commented on PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158839029 > There is a risk this will break user code depending on these libs, right? Given this is a bug, and they should not be using the shaded libs, I think this is ok, thoughts? Classes that we add to relocation are specific for connector implementation. Users not only should not generally use shaded libs, they also should not use internal connector classes that are being affected by this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35561) Flink REST API incorrect documentation
Shyam created FLINK-35561: - Summary: Flink REST API incorrect documentation Key: FLINK-35561 URL: https://issues.apache.org/jira/browse/FLINK-35561 Project: Flink Issue Type: Bug Components: Documentation, Runtime / REST Reporter: Shyam Flink REST API documentation for JAR upload ([/jars/upload|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-upload]) states that the response will contain fileName, which will then be used later to run Flink Jobs. In the [/jar/:jarid/run]([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)] documentation, the definition for :jarid is "String value that identifies a jar. When uploading the jar a path is returned, where the filename is the ID. This value is equivalent to the `id` field in the list of uploaded jars (/jars)." This statement identifying file name should be changed to: String value that identifies a jar. When uploading the jar, a path is returned, where the {*}filename contains the ID{*}, and it is the text after the last forward slash. This value is equivalent to the `id` field in the list of uploaded jars (/jars). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
dannycranmer commented on PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2158683163 @hlteoh37 / @z3d1k can you please take a look at this one too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
dannycranmer commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1633458080 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +/** + * Default implementation of {@link ElementConverter} that lazily falls back to {@link + * DynamoDbTypeInformedElementConverter}. + */ +@PublicEvolving +public class DefaultDynamoDbElementConverter +implements ElementConverter { + +private ElementConverter elementConverter; + +public DefaultDynamoDbElementConverter() {} + +@Override +public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) { +if (elementConverter == null) { +TypeInformation typeInfo = (TypeInformation) TypeInformation.of(t.getClass()); +if (!(typeInfo instanceof CompositeType)) { +throw new IllegalArgumentException("The input type must be a CompositeType."); +} + +elementConverter = +new DynamoDbTypeInformedElementConverter<>((CompositeType) typeInfo); +} + +return elementConverter.apply(t, context); +} + +@Override +public void open(Sink.InitContext context) {} Review Comment: nit: Do we need this? I think it has a no-op default implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink
[ https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853713#comment-17853713 ] Danny Cranmer commented on FLINK-35435: --- Merged commit [{{c9def98}}|https://github.com/apache/flink/commit/c9def98128f0b948d98a11d57da9ea69d326c227] into master > [FLIP-451] Introduce timeout configuration to AsyncSink > --- > > Key: FLINK-35435 > URL: https://issues.apache.org/jira/browse/FLINK-35435 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot > 2024-05-24 at 12.06.20.png > > > Implementation Ticket for: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink
[ https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-35435. --- Resolution: Done > [FLIP-451] Introduce timeout configuration to AsyncSink > --- > > Key: FLINK-35435 > URL: https://issues.apache.org/jira/browse/FLINK-35435 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot > 2024-05-24 at 12.06.20.png > > > Implementation Ticket for: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink
[ https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-35435: - Assignee: Ahmed Hamdy > [FLIP-451] Introduce timeout configuration to AsyncSink > --- > > Key: FLINK-35435 > URL: https://issues.apache.org/jira/browse/FLINK-35435 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot > 2024-05-24 at 12.06.20.png > > > Implementation Ticket for: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35435] Add timeout Configuration to Async Sink [flink]
dannycranmer merged PR #24839: URL: https://github.com/apache/flink/pull/24839 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
dannycranmer commented on PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158665814 There is a risk this will break user code depending on these libs, right? Given this is a bug, and they should not be using the shaded libs, I think this is ok, thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35559) Shading issue cause class conflict
[ https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-35559: - Assignee: Aleksandr Pilipenko > Shading issue cause class conflict > -- > > Key: FLINK-35559 > URL: https://issues.apache.org/jira/browse/FLINK-35559 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Firehose, Connectors / > Kinesis >Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.4.0 > > Attachments: Screenshot 2024-06-08 at 18.19.30.png > > > Incorrect shading configuration causes ClassCastException during exception > handling when job package flink-connector-kinesis with > flink-connector-aws-kinesis-firehose. > {code:java} > java.lang.ClassCastException: class > software.amazon.awssdk.services.firehose.model.FirehoseException cannot be > cast to class > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > (software.amazon.awssdk.services.firehose.model.FirehoseException and > org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException > are in unnamed module of loader 'app') > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51) > ~[flink-connector-base-1.19.0.jar:1.19.0] > at > org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53) > ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189) > ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79) > ~[utils-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at > software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) > ~[sdk-core-2.20.144.jar:?] > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094) > ~[?:?] > at >
Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]
lincoln-lil commented on code in PR #24889: URL: https://github.com/apache/flink/pull/24889#discussion_r164828 ## docs/layouts/shortcodes/generated/optimizer_config_configuration.html: ## @@ -65,6 +71,12 @@ Enum When it is `TRY_RESOLVE`, the optimizer tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only changelog pipeline. For updates, there are three main NDU problems:1. Non-deterministic functions, include scalar, table, aggregate functions, both builtin and custom ones.2. LookupJoin on an evolving source3. Cdc-source carries metadata fields which are system columns, not belongs to the entity data itself.For the first step, the optimizer automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detailed error message for No.1(Non-deterministic functions) and No.3(Cdc-source with metadata) which is relatively easier to solve by changing the SQL.Default value is `IGNORE`, the optimizer does no changes. Possible values:"TRY_RESOLVE""IGNORE" + + table.optimizer.reuse-optimize-block-with-digest-enabled Review Comment: Add 'Batch & Streaming' label. ## docs/layouts/shortcodes/generated/optimizer_config_configuration.html: ## @@ -125,5 +125,11 @@ Boolean If set to true, it will merge projects when converting SqlNode to RelNode.Note: it is not recommended to turn on unless you are aware of possible side effects, such as causing the output of certain non-deterministic expressions to not meet expectations(see FLINK-20887). + +table.optimizer.union-all-as-breakpoint-enabled Streaming Review Comment: Should be Batch & Streaming, see `BatchCommonSubGraphBasedOptimizer#doOptimize` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1633282273 ## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter @Nullable final Path targetPath; -private CompactingFileWriter.Type writeType = null; +private Type writeType = null; Review Comment: It was used in a mixed manner in the file, in some places `Type`, some other places `CompactingFileWriter.Type`. The class itself is (indirectly) implementing the `CompactingFileWriter` so IMO `Type` and the variable name is definitive enough, but TBH I do not have a strong opinion, personally I prefer shorter code if I have a choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
z3d1k commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633275869 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
hlteoh37 commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633262253 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: Ok sounds good - we discussed offline and suggested changing the shading in `flink-connector-kinesis` to include the whole `flink-connector-aws-base` module instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
gaborgsomogyi commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: No strong opinion just asking. Compaction/compression normally adds postfix, why prefix here? ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: Are we calling this with `..`? ## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter @Nullable final Path targetPath; -private CompactingFileWriter.Type writeType = null; +private Type writeType = null; Review Comment: When we remove the `CompactingFileWriter` prefix then `Type` itself is hard to read. I suggest either put back the prefix or rename it to something more meaningful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
pnowojski commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633225150 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]
flinkbot commented on PR #24920: URL: https://github.com/apache/flink/pull/24920#issuecomment-2158287262 ## CI report: * 97c210a011dd34e0c19deffe746cb3e88b355ce2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]
WencongLiu opened a new pull request, #24920: URL: https://github.com/apache/flink/pull/24920 ## What is the purpose of the change _Support custom shuffle for lookup join_ ## Verifying this change UT and ITs. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
gaborgsomogyi commented on PR #24891: URL: https://github.com/apache/flink/pull/24891#issuecomment-2158262913 @wForget thanks for your efforts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM
[ https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi closed FLINK-35525. - > HDFS delegation token fetched by custom DelegationTokenProvider is not > passed to Yarn AM > - > > Key: FLINK-35525 > URL: https://issues.apache.org/jira/browse/FLINK-35525 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.19.0, 1.18.1 >Reporter: Zhen Wang >Assignee: Zhen Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > I tried running flink with hadoop proxy user by disabling HadoopModuleFactory > and flink built-in token providers, and implementing a custom token provider. > However, only the hdfs token obtained by hadoopfs provider was added in > YarnClusterDescriptor, which resulted in Yarn AM submission failure. > Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM
[ https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi resolved FLINK-35525. --- Fix Version/s: 1.20.0 Resolution: Fixed 74b100b on master > HDFS delegation token fetched by custom DelegationTokenProvider is not > passed to Yarn AM > - > > Key: FLINK-35525 > URL: https://issues.apache.org/jira/browse/FLINK-35525 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.19.0, 1.18.1 >Reporter: Zhen Wang >Assignee: Zhen Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > I tried running flink with hadoop proxy user by disabling HadoopModuleFactory > and flink built-in token providers, and implementing a custom token provider. > However, only the hdfs token obtained by hadoopfs provider was added in > YarnClusterDescriptor, which resulted in Yarn AM submission failure. > Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
gaborgsomogyi merged PR #24891: URL: https://github.com/apache/flink/pull/24891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
ashangit commented on PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2158258410 Updated the PR to take in account this [comment](https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17853606=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17853606) in the jira ticket -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853647#comment-17853647 ] Nicolas Fraison commented on FLINK-35489: - Thks [~mxm] for the feedback. I've updated the [PR|https://github.com/apache/flink-kubernetes-operator/pull/833/files] to not limit the metaspace size. > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: I guess a leftover and should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: I guess this file is a leftover and should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1633123484 ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) { .isInstanceOf(Exception.class); } +/** Tests that REST Client SSL Engine creation fails with bad SSL configuration. */ +@ParameterizedTest +@MethodSource("parameters") +void testRESTClientSSLBadTruststoreType(String sslProvider) { +Configuration clientConfig = createRestSslConfigWithTrustStore(sslProvider); +clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS"); Review Comment: Can we follow the pattern what we've already? I mean `bad-truststore-type` or something. This applies to all other places ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) { .isInstanceOf(Exception.class); } +@ParameterizedTest +@MethodSource("parameters") +void testInternalSSLWrongKeystoreType(String sslProvider) { +final Configuration config = createInternalSslConfigWithKeyAndTrustStores(sslProvider); +config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS"); + +assertThatThrownBy(() -> SSLUtils.createInternalServerSSLEngineFactory(config)) +.isInstanceOf(java.io.IOException.class); + +assertThatThrownBy(() -> SSLUtils.createInternalClientSSLEngineFactory(config)) +.isInstanceOf(java.io.IOException.class); Review Comment: Here too. ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +71,38 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException +| CertificateException +| NoSuchAlgorithmException +| KeyStoreException e) { +throw new RemoteTransportException( Review Comment: What is the plan here with the wrapping? Couple of lines before I see the same wrapping pattern. It would be good to have a behavior like we've in Scala. ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: This is JVM version dependent. Up until 1.8 `jks`, 1.9+ `pkcs12` + it can be overwritten with `keystore.type`, right? I would write here JVM detected default version. ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -293,6 +315,19 @@ void testInternalSSLWrongTruststorePassword(String sslProvider) { .isInstanceOf(Exception.class); } +@ParameterizedTest +@MethodSource("parameters") +void testInternalSSLWrongTruststoreType(String sslProvider) { +final Configuration config = createInternalSslConfigWithKeyAndTrustStores(sslProvider); +config.set(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_TYPE, "BKS"); + +assertThatThrownBy(() -> SSLUtils.createInternalServerSSLEngineFactory(config)) +.isInstanceOf(java.security.KeyStoreException.class); Review Comment: I've the feeling that `java.security.` can be eliminated with an import, right? ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) { .isInstanceOf(Exception.class); } +@ParameterizedTest +@MethodSource("parameters") +void testInternalSSLWrongKeystoreType(String sslProvider) { +
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633185416 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.Internal; + +/** Options to configure behaviour of executing mailbox mails. */ +@Internal +public class MailOptionsImpl implements MailboxExecutor.MailOptions { +static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(); +static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl().setDeferrable(); + +private boolean deferrable; Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
pnowojski commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633182865 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.Internal; + +/** Options to configure behaviour of executing mailbox mails. */ +@Internal +public class MailOptionsImpl implements MailboxExecutor.MailOptions { +static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(); +static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl().setDeferrable(); + +private boolean deferrable; Review Comment: ops, dunno why have I left it this way. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset
[ https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jan Gurda updated FLINK-35542: -- Affects Version/s: jdbc-3.2.0 > ClassNotFoundException when deserializing CheckpointedOffset > > > Key: FLINK-35542 > URL: https://issues.apache.org/jira/browse/FLINK-35542 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0, jdbc-3.1.2 > Environment: Flink 1.19.0 > Flink JDBC Connector 3.2-SNAPSHOT (commit > 2defbbcf4fc550a76dd9c664e1eed7d261e028ca) > JDK 11 (Temurin) >Reporter: Jan Gurda >Priority: Major > Labels: pull-request-available > Fix For: jdbc-3.3.0 > > > I use the latest flink-connector-jdbc code from the main branch, it's > actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca). > > When jobs get interrupted while reading data from the JDBC source (for > example, by the TaskManager outage), they cannot recover due to the following > exception: > {code:java} > java.lang.RuntimeException: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) > at > org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) > at > org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) > at > org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) > at > org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.jdbc.source.split.CheckpointedOffset > at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown > Source) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown > Source) > at java.base/java.lang.ClassLoader.loadClass(Unknown Source) > at java.base/java.lang.Class.forName0(Native Method) > at java.base/java.lang.Class.forName(Unknown Source) > at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) > at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633159087 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java: ## @@ -86,6 +87,25 @@ public interface MailboxExecutor { /** A constant for empty args to save on object allocation. */ Object[] EMPTY_ARGS = new Object[0]; +/** Extra options to configure enqueued mails. */ +@Experimental +interface MailOptions { +static MailOptions options() { +return new MailOptionsImpl(); Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633154332 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.Internal; + +/** Options to configure behaviour of executing mailbox mails. */ +@Internal +public class MailOptionsImpl implements MailboxExecutor.MailOptions { +static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl(); +static final MailboxExecutor.MailOptions DEFERRABLE = new MailOptionsImpl().setDeferrable(); + +private boolean deferrable; Review Comment: Can we make this field immutable now? And make the constructor private? ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java: ## @@ -86,6 +87,25 @@ public interface MailboxExecutor { /** A constant for empty args to save on object allocation. */ Object[] EMPTY_ARGS = new Object[0]; +/** Extra options to configure enqueued mails. */ +@Experimental +interface MailOptions { Review Comment: I'm not sure if hiding getters makes sense (users might be confused and even need them for testing). But that can be changed later if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
wForget commented on PR #24891: URL: https://github.com/apache/flink/pull/24891#issuecomment-2158200824 > Just seen the error in the pipeline, plz fix it: Thanks @gaborgsomogyi ,I have updated document and CI passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2158109164 Please have a look at the failing test: ``` Jun 10 07:17:02 07:17:02.965 [INFO] Running org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase Jun 10 07:17:03 07:17:03.593 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.623 s <<< FAILURE! -- in org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase Jun 10 07:17:03 07:17:03.593 [ERROR] org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testCompleteness -- Time elapsed: 0.593 s <<< FAILURE! Jun 10 07:17:03 java.lang.AssertionError: Jun 10 07:17:03 Documentation is outdated, please regenerate it according to the instructions in flink-docs/README.md. Jun 10 07:17:03 Problems: Jun 10 07:17:03 Documentation of security.ssl.rest.truststore-type in SecurityOptions is outdated. Expected: default=("jks") description=(The type of the truststore for Flink's external REST endpoints.). Jun 10 07:17:03 Documentation of security.ssl.rest.keystore-type in SecurityOptions is outdated. Expected: default=("jks") description=(The type of the keystore for Flink's external REST endpoints.). Jun 10 07:17:03 Documentation of security.ssl.internal.keystore-type in SecurityOptions is outdated. Expected: default=("jks") description=(The type of keystore for Flink's internal endpoints (rpc, data transport, blob server).). Jun 10 07:17:03 Documentation of security.ssl.internal.truststore-type in SecurityOptions is outdated. Expected: default=("jks") description=(The type of truststore for Flink's internal endpoints (rpc, data transport, blob server).). Jun 10 07:17:03 Documented option security.ssl.rest.truststore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.rest.truststore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.rest.keystore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.rest.keystore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.internal.keystore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.internal.keystore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.internal.truststore-type does not exist. Jun 10 07:17:03 Documented option security.ssl.internal.truststore-type does not exist. Jun 10 07:17:03 at org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:209) Jun 10 07:17:03 at org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testCompleteness(ConfigOptionsDocsCompletenessITCase.java:70) Jun 10 07:17:03 at java.lang.reflect.Method.invoke(Method.java:498) Jun 10 07:17:03 at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) Jun 10 07:17:03 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) Jun 10 07:17:03 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) Jun 10 07:17:03 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) Jun 10 07:17:03 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Flink 1.19.1 release [flink-web]
hlteoh37 commented on PR #745: URL: https://github.com/apache/flink-web/pull/745#issuecomment-2158002413 Thanks @1996fanrui - great comments! I've addressed them. I've removed the below JIRAs from the release notes as they related to build infrastructure: ### Subtask - [FLINK-33816](https://issues.apache.org/jira/browse/FLINK-33816) - SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed - [FLINK-34707](https://issues.apache.org/jira/browse/FLINK-34707) - Update japicmp configuration - [FLINK-34712](https://issues.apache.org/jira/browse/FLINK-34712) - Update reference data for Migration Tests ### Bug - [FLINK-26515](https://issues.apache.org/jira/browse/FLINK-26515) - RetryingExecutorTest. testDiscardOnTimeout failed on azure - [FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114) - TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch - [FLINK-31472](https://issues.apache.org/jira/browse/FLINK-31472) - AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread - [FLINK-34324](https://issues.apache.org/jira/browse/FLINK-34324) - s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced - [FLINK-34569](https://issues.apache.org/jira/browse/FLINK-34569) - Streaming File Sink s3 end-to-end test failed - [FLINK-34571](https://issues.apache.org/jira/browse/FLINK-34571) - SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError failed due an assertion - [FLINK-34617](https://issues.apache.org/jira/browse/FLINK-34617) - Correct the Javadoc of org.apache.flink.api.common.time.Time - [FLINK-34622](https://issues.apache.org/jira/browse/FLINK-34622) - Typo of execution_mode configuration name in Chinese document - [FLINK-34933](https://issues.apache.org/jira/browse/FLINK-34933) - JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored isnt implemented properly - [FLINK-35000](https://issues.apache.org/jira/browse/FLINK-35000) - PullRequest template doesnt use the correct format to refer to the testing code convention - [FLINK-35383](https://issues.apache.org/jira/browse/FLINK-35383) - Update compatibility matrix to include 1.19 release - [FLINK-35526](https://issues.apache.org/jira/browse/FLINK-35526) - Remove deprecated stedolan/jq Docker image from Flink e2e tests ### Technical Debt - [FLINK-34409](https://issues.apache.org/jira/browse/FLINK-34409) - Increase test coverage for AdaptiveScheduler - [FLINK-34897](https://issues.apache.org/jira/browse/FLINK-34897) - JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip needs to be enabled again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Flink 1.19.1 release [flink-web]
hlteoh37 commented on code in PR #745: URL: https://github.com/apache/flink-web/pull/745#discussion_r1633036672 ## docs/content/posts/2024-06-14-release-1.19.1.md: ## @@ -0,0 +1,171 @@ +--- +title: "Apache Flink 1.19.1 Release Announcement" +date: "2024-06-14T00:00:00.000Z" +aliases: +- /news/2024/06/14/release-1.19.1.html +authors: +- hong: + name: "Hong" + twitter: "hlteoh2" +--- + +The Apache Flink Community is pleased to announce the first bug fix release of the Flink 1.19 series. + +This release includes 44 bug fixes, vulnerability fixes, and minor improvements for Flink 1.19. +Below you will find a list of all bugfixes and improvements (excluding improvements to the build infrastructure and build stability). For a complete list of all changes see: Review Comment: Removed the below JIRAs from release notes. Will add as comment on PR ### Subtask - [FLINK-33816](https://issues.apache.org/jira/browse/FLINK-33816) - SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed - [FLINK-34707](https://issues.apache.org/jira/browse/FLINK-34707) - Update japicmp configuration - [FLINK-34712](https://issues.apache.org/jira/browse/FLINK-34712) - Update reference data for Migration Tests ### Bug - [FLINK-26515](https://issues.apache.org/jira/browse/FLINK-26515) - RetryingExecutorTest. testDiscardOnTimeout failed on azure - [FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114) - TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch - [FLINK-31472](https://issues.apache.org/jira/browse/FLINK-31472) - AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread - [FLINK-34324](https://issues.apache.org/jira/browse/FLINK-34324) - s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced - [FLINK-34569](https://issues.apache.org/jira/browse/FLINK-34569) - Streaming File Sink s3 end-to-end test failed - [FLINK-34571](https://issues.apache.org/jira/browse/FLINK-34571) - SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError failed due an assertion - [FLINK-34617](https://issues.apache.org/jira/browse/FLINK-34617) - Correct the Javadoc of org.apache.flink.api.common.time.Time - [FLINK-34622](https://issues.apache.org/jira/browse/FLINK-34622) - Typo of execution_mode configuration name in Chinese document - [FLINK-34933](https://issues.apache.org/jira/browse/FLINK-34933) - JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored isnt implemented properly - [FLINK-35000](https://issues.apache.org/jira/browse/FLINK-35000) - PullRequest template doesnt use the correct format to refer to the testing code convention - [FLINK-35383](https://issues.apache.org/jira/browse/FLINK-35383) - Update compatibility matrix to include 1.19 release - [FLINK-35526](https://issues.apache.org/jira/browse/FLINK-35526) - Remove deprecated stedolan/jq Docker image from Flink e2e tests ### Technical Debt - [FLINK-34409](https://issues.apache.org/jira/browse/FLINK-34409) - Increase test coverage for AdaptiveScheduler - [FLINK-34897](https://issues.apache.org/jira/browse/FLINK-34897) - JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip needs to be enabled again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Flink 1.19.1 release [flink-web]
hlteoh37 commented on code in PR #745: URL: https://github.com/apache/flink-web/pull/745#discussion_r1633022942 ## docs/content/posts/2024-06-14-release-1.19.1.md: ## @@ -0,0 +1,171 @@ +--- +title: "Apache Flink 1.19.1 Release Announcement" +date: "2024-06-14T00:00:00.000Z" +aliases: +- /news/2024/06/14/release-1.19.1.html +authors: +- hong: + name: "Hong" + twitter: "hlteoh2" +--- + +The Apache Flink Community is pleased to announce the first bug fix release of the Flink 1.19 series. + +This release includes 44 bug fixes, vulnerability fixes, and minor improvements for Flink 1.19. +Below you will find a list of all bugfixes and improvements (excluding improvements to the build infrastructure and build stability). For a complete list of all changes see: +[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354399). + +We highly recommend all users upgrade to Flink 1.19.1. + +# Release Artifacts + +## Maven Dependencies + +```xml + + org.apache.flink + flink-java + 1.19.1 + + + org.apache.flink + flink-streaming-java + 1.19.1 + + + org.apache.flink + flink-clients + 1.19.1 + +``` + +## Binaries + +You can find the binaries on the updated [Downloads page]({{< relref "downloads" >}}). + +## Docker Images + +* [library/flink](https://hub.docker.com/_/flink/tags?page=1=1.19.1) (official images) +* [apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.19.1) (ASF repository) + +## PyPi + +* [apache-flink==1.19.1](https://pypi.org/project/apache-flink/1.19.1/) + +# Release Notes + + +Release Notes - Flink - Version 1.19.1 + +Sub-task + + +[FLINK-33816] - SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed + +[FLINK-34707] - Update japicmp configuration + +[FLINK-34712] - Update reference data for Migration Tests + + + +Bug + + +[FLINK-26515] - RetryingExecutorTest. testDiscardOnTimeout failed on azure + +[FLINK-26808] - [flink v1.14.2] Submit jobs via REST API not working after set web.submit.enable: false + +[FLINK-27741] - Fix NPE when use dense_rank() and rank() in over aggregation + +[FLINK-28693] - Codegen failed if the watermark is defined on a columnByExpression + +[FLINK-29114] - TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch + +[FLINK-31223] - sql-client.sh fails to start with ssl enabled + +[FLINK-31472] - AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread + +[FLINK-32513] - Job in BATCH mode with a significant number of transformations freezes on method StreamGraphGenerator.existsUnboundedSource() + +[FLINK-32828] - Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint + +[FLINK-33798] - Automatically clean up rocksdb logs when the task failover. + +[FLINK-34324] - s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced + +[FLINK-34379] - table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError + +[FLINK-34517] - environment configs ignored when calling procedure operation + +[FLINK-34569] - Streaming File Sink s3 end-to-end test failed + +[FLINK-34571] - SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError failed due an assertion + +[FLINK-34616] - python dist doesnt clean when open method construct resource + +[FLINK-34617] - Correct the Javadoc of org.apache.flink.api.common.time.Time + +[FLINK-34622] - Typo of execution_mode configuration name in Chinese document + +[FLINK-34725] - Dockerfiles for release publishing has incorrect config.yaml path + +[FLINK-34933] - JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored isnt implemented properly + +[FLINK-34956] - The config type is wrong for Duration + +[FLINK-35000] - PullRequest template doesnt use the correct format to refer to the testing code convention + +[FLINK-35089] - Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes + +[FLINK-35097] - Table API Filesystem connector with raw format repeats last line + +[FLINK-35098] - Incorrect results for queries like 10 = y on tables using Filesystem connector and Orc format + +[FLINK-35112] - Membership for Row class does not include field names + +[FLINK-35159] - CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash + +[FLINK-35169] - Recycle buffers to freeSegments before releasing data buffer for sort accumulator + +[FLINK-35217] - Missing fsync in FileSystemCheckpointStorage + +[FLINK-35351] - Restore from unaligned checkpoints with a custom partitioner fails. + +[FLINK-35358] -
Re: [PR] Add Flink 1.19.1 release [flink-web]
hlteoh37 commented on code in PR #745: URL: https://github.com/apache/flink-web/pull/745#discussion_r1633022433 ## docs/content/posts/2024-06-14-release-1.19.1.md: ## @@ -0,0 +1,171 @@ +--- +title: "Apache Flink 1.19.1 Release Announcement" +date: "2024-06-14T00:00:00.000Z" +aliases: +- /news/2024/06/14/release-1.19.1.html +authors: +- hong: + name: "Hong" + twitter: "hlteoh2" +--- + +The Apache Flink Community is pleased to announce the first bug fix release of the Flink 1.19 series. + +This release includes 44 bug fixes, vulnerability fixes, and minor improvements for Flink 1.19. +Below you will find a list of all bugfixes and improvements (excluding improvements to the build infrastructure and build stability). For a complete list of all changes see: +[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354399). + +We highly recommend all users upgrade to Flink 1.19.1. + +# Release Artifacts + +## Maven Dependencies + +```xml + + org.apache.flink + flink-java + 1.19.1 + + + org.apache.flink + flink-streaming-java + 1.19.1 + + + org.apache.flink + flink-clients + 1.19.1 + +``` + +## Binaries + +You can find the binaries on the updated [Downloads page]({{< relref "downloads" >}}). + +## Docker Images + +* [library/flink](https://hub.docker.com/_/flink/tags?page=1=1.19.1) (official images) +* [apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.19.1) (ASF repository) + +## PyPi + +* [apache-flink==1.19.1](https://pypi.org/project/apache-flink/1.19.1/) + +# Release Notes + + +Release Notes - Flink - Version 1.19.1 + +Sub-task Review Comment: Ok - removed. All subtasks are not relevant to users -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853606#comment-17853606 ] Maximilian Michels commented on FLINK-35489: Hey [~ashangit]! Good idea to switch the order between busgeting heap and metaspace, but metaspace should be allowed to grow and not be fixed. The reason is that metaspace is often configured too low. We can prevent metaspace related errors by detecting and increasing metaspace. > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
gyfora commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1632931104 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: @wenbingshen did you have a time to check this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
pnowojski commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1632887041 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java: ## @@ -86,6 +87,25 @@ public interface MailboxExecutor { /** A constant for empty args to save on object allocation. */ Object[] EMPTY_ARGS = new Object[0]; +/** Extra options to configure enqueued mails. */ +@Experimental +interface MailOptions { +static MailOptions options() { +return new MailOptionsImpl(); Review Comment: I've went with factory patter for now. Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
[ https://issues.apache.org/jira/browse/FLINK-35556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853589#comment-17853589 ] Hong Liang Teoh commented on FLINK-35556: - Changing fix version to 1.19.2 > Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED > --- > > Key: FLINK-35556 > URL: https://issues.apache.org/jira/browse/FLINK-35556 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.19.0, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.2 > > > See > https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35557) MemoryManager only reserves memory per consumer type once
[ https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-35557: Fix Version/s: 1.19.2 (was: 1.19.1) > MemoryManager only reserves memory per consumer type once > - > > Key: FLINK-35557 > URL: https://issues.apache.org/jira/browse/FLINK-35557 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Runtime / Task >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Fix For: 1.20.0, 1.19.2 > > > # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we > [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] > a reserve function > # The function > [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] > the available Slot memory and fails if there's not enough memory > # We pass it to {{SharedResources.getOrAllocateSharedResource}} > # In {{SharedResources.getOrAllocateSharedResource}} , we check if the > resource (memory) was already reserved by some key (e.g. > {{{}state-rocks-managed-memory{}}}) > # If not, we create a new one and call the reserve function > # If the resource was already reserved (not null), we do NOT reserve the > memory again: > [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] > So there will be only one (first) memory reservation for rocksdb for example, > no matter how many state backends in a slot are created. Meaning that managed > memory limits are not applied -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35557) MemoryManager only reserves memory per consumer type once
[ https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853588#comment-17853588 ] Hong Liang Teoh commented on FLINK-35557: - Changing this to 1.19.2 release as this is a long-standing bug without a fix atm. > MemoryManager only reserves memory per consumer type once > - > > Key: FLINK-35557 > URL: https://issues.apache.org/jira/browse/FLINK-35557 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Runtime / Task >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Fix For: 1.20.0, 1.19.2 > > > # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we > [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526] > a reserve function > # The function > [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61] > the available Slot memory and fails if there's not enough memory > # We pass it to {{SharedResources.getOrAllocateSharedResource}} > # In {{SharedResources.getOrAllocateSharedResource}} , we check if the > resource (memory) was already reserved by some key (e.g. > {{{}state-rocks-managed-memory{}}}) > # If not, we create a new one and call the reserve function > # If the resource was already reserved (not null), we do NOT reserve the > memory again: > [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71] > So there will be only one (first) memory reservation for rocksdb for example, > no matter how many state backends in a slot are created. Meaning that managed > memory limits are not applied -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
[ https://issues.apache.org/jira/browse/FLINK-35556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-35556: Fix Version/s: 1.19.2 (was: 1.19.1) > Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED > --- > > Key: FLINK-35556 > URL: https://issues.apache.org/jira/browse/FLINK-35556 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.19.0, 1.20.0 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0, 1.19.2 > > > See > https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-34633: Fix Version/s: 1.20.0 (was: 1.19.1) > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34633) Support unnesting array constants
[ https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853587#comment-17853587 ] Hong Liang Teoh commented on FLINK-34633: - Thanks for checking. Will remove the PR from the release notes > Support unnesting array constants > - > > Key: FLINK-34633 > URL: https://issues.apache.org/jira/browse/FLINK-34633 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Assignee: Jeyhun Karimov >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.1 > > > It seems that the current planner doesn't support using UNNEST on array > constants.(x) > {code:java} > SELECT * FROM UNNEST(ARRAY[1,2,3]);{code} > > The following query can't be compiled.(x) > {code:java} > SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code} > > The rewritten version works. (/) > {code:java} > SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN > UNNEST(A){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
pnowojski commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1632838546 ## flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java: ## @@ -86,6 +87,25 @@ public interface MailboxExecutor { /** A constant for empty args to save on object allocation. */ Object[] EMPTY_ARGS = new Object[0]; +/** Extra options to configure enqueued mails. */ +@Experimental +interface MailOptions { Review Comment: Interface has only setters, as those are the only important bit for the user. Getters are in concrete internal class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
z3d1k commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1632814622 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: @hlteoh37 No, this package currently contains only util classes for working with AWS exceptions and not used for state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
foxus commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1632803808 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: @hlteoh37 are you referring to the risk of restoring state that references FQCN that has now changed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]
pnowojski commented on code in PR #24895: URL: https://github.com/apache/flink/pull/24895#discussion_r1632799682 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ## @@ -307,18 +307,36 @@ void onProcessingTime(long time) throws Exception { } public void advanceWatermark(long time) throws Exception { -currentWatermark = time; +Preconditions.checkState( +tryAdvanceWatermark( +time, +() -> { +// Never stop advancing. +return false; +})); +} +/** + * @return true if following watermarks can be processed immediately. False if the firing timers + * should be interrupted as soon as possible. + */ +public boolean tryAdvanceWatermark( +long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) +throws Exception { +currentWatermark = time; InternalTimer timer; - while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time && !cancellationContext.isCancelled()) { keyContext.setCurrentKey(timer.getKey()); eventTimeTimersQueue.poll(); triggerTarget.onEventTime(timer); taskIOMetricGroup.getNumFiredTimers().inc(); +if (shouldStopAdvancingFn.shouldStopAdvancing()) { +return false; +} Review Comment: Instead of `boolean first` I've added `boolean stop` and re-used it both for cancellation and interruption. I didn't want to make the `while()` condition any longer, as it would become too difficult to read IMO. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2157658542 @ammar-master thanks for this PR, looks quality code from first glance perspective. I'm going to shepherd it this week... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632777604 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. Review Comment: We have done it in tests previously to mitigate the "clutter" problem, and difficulty when creating the connector distribution jars. https://github.com/apache/flink-connector-aws/tree/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples We could do something similar and point to them via README, OR we could add a maven profile that only adds the examples module for PR / nightlies -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org