[jira] [Commented] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
[ https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817602#comment-17817602 ] Matthias Pohl commented on FLINK-22765: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57533&view=logs&j=a657ddbf-d986-5381-9649-342d9c92e7fb&t=dc085d4a-05c8-580e-06ab-21f5624dab16&l=8998 > ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable > > > Key: FLINK-22765 > URL: https://issues.apache.org/jira/browse/FLINK-22765 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Fix For: 1.14.0, 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56 > {code} > May 25 00:56:38 java.lang.AssertionError: > May 25 00:56:38 > May 25 00:56:38 Expected: is "" > May 25 00:56:38 but: was "The system is out of resources.\nConsult the > following stack trace for details." > May 25 00:56:38 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:956) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:923) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70) > May 25 00:56:38 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > May 25 00:56:38 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > May 25 00:56:38 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > May 25 00:56:38 at java.lang.reflect.Method.invoke(Method.java:498) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > May 25 00:56:38 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > May 25 00:56:38 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > May 25 00:56:38 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > May 25 00:56:38 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > May 25 00:56:38 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > May 25 00:56:38 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > May 25 00:56:38 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > May 25 00:56:38 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > May 25 00:56:38 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > May 25 00:56:38 at > org.apache.maven.surefir
Re: [PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]
prakash-42 commented on code in PR #776: URL: https://github.com/apache/flink-kubernetes-operator/pull/776#discussion_r1490538791 ## docs/content/docs/custom-resource/overview.md: ## @@ -216,6 +216,38 @@ Alternatively, if you use helm to install flink-kubernetes-operator, it allows y - Last-state upgradeMode is currently not supported for FlinkSessionJobs +## Flink SQL Jobs Review Comment: Thanks for the comments @mxm ! I agree that we should have a dedicated page for examples, instead of adding more in the overview page. However, I have a few follow-up concerns/questions: 1. If I don't copy over from the examples' readme, I'd still be needing to write content that's similar to whatever is present in the example's readme. I can try and make it short, while linking to the readme page for more details. 2. If I create a dedicated Examples page, then I think it shouldn't just have the Flink SQL example, but other examples as well. I will try and write a few more, but not sure if I have enough experience/knowledge to write about all the examples. Can we make this like a running doc where more examples will be added as the need arises? 3. One concern I have about linking to README pages is that I'm likely going to link to the examples in main branch, and that code may change with later releases in the operator. Would that possibly make the examples in the operator point to an incompatible readme? (I see that the existing links also point to the main branch, so maybe that's okay) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]
GaoZG2 closed pull request #24313: [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. URL: https://github.com/apache/flink/pull/24313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri stateStore.storeScalingTracking(ctx, scalingTracking); } -if (collectedMetrics.getMetricHistory().isEmpty()) { +if (collectedMetrics.getMetricHistory().size() < 2) { Review Comment: Would you mind adding a simple comment to explain why we call the scaling logic when metric history size >= 2? It may be helpful to let new developers to understand these logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri stateStore.storeScalingTracking(ctx, scalingTracking); } -if (collectedMetrics.getMetricHistory().isEmpty()) { +if (collectedMetrics.getMetricHistory().size() < 2) { Review Comment: Would you mind adding a simple comment to explain why we call the scaling logic when metric history size >= 2? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks [flink]
1996fanrui commented on PR #24288: URL: https://github.com/apache/flink/pull/24288#issuecomment-1945274578 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]
1996fanrui commented on code in PR #762: URL: https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1490294787 ## flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java: ## @@ -189,6 +191,28 @@ public void removeParallelismOverrides(Context jobContext) { jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), PARALLELISM_OVERRIDES); } +@Override +public void storeConfigOverrides(Context jobContext, Configuration configOverrides) { +jdbcStateStore.putSerializedState( +getSerializeKey(jobContext), +CONFIG_OVERRIDES, +serializeParallelismOverrides(configOverrides.toMap())); Review Comment: ```suggestion serializeConfigOverrides(configOverrides.toMap())); ``` It may be a bug. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/MemoryTuningUtils.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.utils; + +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec; +import org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; +import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory; +import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; + +/** Tunes the TaskManager memory. */ +public class MemoryTuningUtils { + +private static final Logger LOG = LoggerFactory.getLogger(MemoryTuningUtils.class); +public static final ProcessMemoryUtils FLINK_MEMORY_UTILS = +new ProcessMemoryUtils<>(getMemoryOptions(), new TaskExecutorFlinkMemoryUtils()); + +private static final Configuration EMPTY_CONFIG = new Configuration(); + +/** + * Emits a Configuration which contains overrides for the current configuration. We are not + * modifying the config directly, but we are emitting a new configuration which contains any + * overrides. This config is persisted separately and applied by the autoscaler. That way we can + * clear any applied overrides if auto-tuning is disabled. + */ +public static Configuration tuneTaskManagerHeapMemory( +JobAutoScalerContext context, +EvaluatedMetrics evaluatedMetrics, +AutoScalerEventHandler eventHandler) { + +// Please note that this config is the original configuration created from the user spec. +// It does not contain any already applied overrides. +var config = new UnmodifiableConfiguration(context.getConfiguration()); + +// Gather original memory configuration from the user spec +CommonProcessMemorySpec memSpecs; +try { +memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config); +} catch (IllegalConfigurationException e) { +LOG.warn("Current memory configuration is not valid. Aborting memory tuning."); +return EMPTY_CONFIG; +} + +var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize(); +LOG.info("Current configured heap size: {}", maxHeapSize); + +MemorySize avgHeapSize = get
[jira] [Commented] (FLINK-34395) Release Testing Instructions: Verify FLINK-32514 Support using larger checkpointing interval when source is processing backlog
[ https://issues.apache.org/jira/browse/FLINK-34395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817575#comment-17817575 ] Yunfeng Zhou commented on FLINK-34395: -- Hi [~lincoln.86xy] [~jingge] Sorry for the late reply. This feature does not need cross-team testing. Could you please help close this ticket? It seems that I don't have the permission to close it. > Release Testing Instructions: Verify FLINK-32514 Support using larger > checkpointing interval when source is processing backlog > -- > > Key: FLINK-34395 > URL: https://issues.apache.org/jira/browse/FLINK-34395 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Yunfeng Zhou >Priority: Blocker > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]
1996fanrui commented on code in PR #762: URL: https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1490292087 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -250,6 +251,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold."); +public static final ConfigOption MEMORY_TUNING_ENABLED = +autoScalerConfig("memory.tuning.enabled") +.booleanType() +.defaultValue(false) + .withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled")) +.withDescription( +"If enabled, the initial amount of memory specified for TaskManagers will be reduced according to the observed needs."); + +public static final ConfigOption MEMORY_TUNING_MIN_HEAP = +autoScalerConfig("memory.tuning.heap.min") +.memoryType() +.defaultValue(MemorySize.ofMebiBytes(2048L)) + .withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min")) +.withDescription( +"The minimum amount of TaskManager memory, if memory tuning is enabled."); Review Comment: Hi @mxm , a gentle reminder: your change might missed this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
1996fanrui commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490288389 ## docs/content/docs/custom-resource/autoscaler.md: ## @@ -286,16 +286,20 @@ please download JDBC driver and initialize database and table first. ``` JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar -# export the password of jdbc state store +# export the password of jdbc state store & jdbc event handler export STATE_STORE_JDBC_PWD=123456 +export EVENT_HANDLER_JDBC_PWD=123456 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \ --autoscaler.standalone.fetcher.flink-cluster.host localhost \ --autoscaler.standalone.fetcher.flink-cluster.port 8081 \ --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.state-store.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ ---autoscaler.standalone.state-store.jdbc.username root +--autoscaler.standalone.state-store.jdbc.username root \ +--autoscaler.standalone.event-handler.type jdbc \ +--autoscaler.standalone.event-handler.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ +--autoscaler.standalone.event-handler.jdbc.username root Review Comment: Thanks for the feedback! I have updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
1996fanrui commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key + "export the password using this environment variable.", code(STATE_STORE_TYPE.key()), code(JDBC.toString())) .build()); + +public static final ConfigOption EVENT_HANDLER_TYPE = +autoscalerStandaloneConfig("event-handler.type") +.enumType(EventHandlerType.class) +.defaultValue(EventHandlerType.LOGGING) +.withDescription("The autoscaler event handler type."); Review Comment: We can configure them separately. ``` --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.event-handler.type jdbc \ ``` After users choose JDBC for them, we can reuse all jdbc related options, such as: ``` autoscaler.standalone.jdbc.username autoscaler.standalone.jdbc.url ``` Here is an whole example(This example is from the doc): ``` JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar # export the password of jdbc state store & jdbc event handler export JDBC_PWD=123456 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \ --autoscaler.standalone.fetcher.flink-cluster.host localhost \ --autoscaler.standalone.fetcher.flink-cluster.port 8081 \ --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.event-handler.type jdbc \ --autoscaler.standalone.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ --autoscaler.standalone.jdbc.username root ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
1996fanrui commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key + "export the password using this environment variable.", code(STATE_STORE_TYPE.key()), code(JDBC.toString())) .build()); + +public static final ConfigOption EVENT_HANDLER_TYPE = +autoscalerStandaloneConfig("event-handler.type") +.enumType(EventHandlerType.class) +.defaultValue(EventHandlerType.LOGGING) +.withDescription("The autoscaler event handler type."); Review Comment: We can configure them separately. ``` --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.event-handler.type jdbc \ ``` After users choose JDBC for them, we can reuse all jdbc related options, such as: ``` autoscaler.standalone.jdbc.username autoscaler.standalone.jdbc.url ``` Here is an example: ``` ``` JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar # export the password of jdbc state store & jdbc event handler export JDBC_PWD=123456 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \ --autoscaler.standalone.fetcher.flink-cluster.host localhost \ --autoscaler.standalone.fetcher.flink-cluster.port 8081 \ --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.event-handler.type jdbc \ --autoscaler.standalone.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ --autoscaler.standalone.jdbc.username root ``` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
1996fanrui commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key + "export the password using this environment variable.", code(STATE_STORE_TYPE.key()), code(JDBC.toString())) .build()); + +public static final ConfigOption EVENT_HANDLER_TYPE = +autoscalerStandaloneConfig("event-handler.type") +.enumType(EventHandlerType.class) +.defaultValue(EventHandlerType.LOGGING) +.withDescription("The autoscaler event handler type."); Review Comment: We can configure them separately. ``` --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.event-handler.type jdbc \ ``` After users choose JDBC for them, we can reuse all jdbc related options, such as: ``` autoscaler.standalone.jdbc.username autoscaler.standalone.jdbc.url ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly writes update_time [flink-kubernetes-operator]
1996fanrui merged PR #771: URL: https://github.com/apache/flink-kubernetes-operator/pull/771 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34248] Implement restore tests for changelog normalize node [flink]
bvarghese1 commented on PR #24203: URL: https://github.com/apache/flink/pull/24203#issuecomment-1945216459 > Looks good, but could we make the input/output easier to digest? > > 1. Do we need to have a two column key? > 2. Can we have a non-numeric key? At least for me it's easier to track changes for something more distinct. 1. Simplified to 1 primary key 2. Switched to non-numeric key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now `ARRAY_SORT(,[, [, ]])`, where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. The description like this `The function sorts an array, defaulting to ascending order with NULLs at the start when only the array is input. Specifying ascending_order as true orders the array in ascending with NULLs first, and setting it to false orders it in descending with NULLs last. Independently, null_first as true moves NULLs to the beginning, and as false to the end, irrespective of the sorting order. The function returns null if any input is null.` @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now `ARRAY_SORT(,[, [, ]])`, where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. The description like this `Returns the array in sorted order. When only the input array is provided, ascending_order and null_first default to true, sorting the array in ascending order with NULLs at the start. Setting ascending_order to true sorts array in ascending order, placing NULLs first. Setting it to false sorts array in descending order, with NULLs last. null_first true places NULLs at the beginning, and false at the end, regardless of sort order. If any input is null, the function returns null.` @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now `ARRAY_SORT(,[, [, ]])`, where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. The description like this `Returns the array in sorted order. When only the input array is provided, ascending_order and null_first default to true, sorting the array in ascending order with NULLs at the start. True for ascending_order results in ascending sort with NULLs first, while false leads to descending sort with NULLs last. Regardless of ascending_order, null_first true places NULLs at the beginning, and false at the end. If any input is null, the function returns null.` @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now `ARRAY_SORT(,[, [, ]])`, where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. The description like this `The function sorts an array with ascending_order and null_first as optional inputs. By default, with only the input array, both are true, resulting in ascending order sorting with NULLs at the beginning. Specifying ascending_order without null_first results in sorting where true for ascending_order places NULLs first, and false places NULLs last. With all three inputs, null_first true places NULLs at the start, and false at the end, regardless of ascending_order's value. If any input is null, the function returns null.` @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table
[ https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817533#comment-17817533 ] Vallari Rastogi commented on FLINK-32596: - [~luoyuxia] Hive Metastore expects the partitioned column should be last while inserting data. [hiveql - Hive partition column - Stack Overflow|https://stackoverflow.com/questions/60510174/hive-partition-column] So what flink does is, it uses the last 'n' columns as PartitionColumns: [https://github.com/apache/flink/blob/403694e7b9c213386f3ed9cff21ce2664030ebc2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java#L515] And select , insert Queries follows the same logic of finding partition columns at the last! As a test, I made a chng here. [https://github.com/apache/flink/commit/df47ceaba82a3d4f3392c1b53bb52f34d520cc3d] Results: !image-2024-02-15-03-05-22-541.png|width=600,height=106! !image-2024-02-15-03-06-28-175.png|width=468,height=267! The partitions will always come at the last due to HMS. Either we use insert stmt like: _INSERT INTO testHive2 PARTITION (ts='22:16:46', active='TRUE') SELECT 1, 46, 'false';_ _SELECT query output:_ _!image-2024-02-15-03-08-50-029.png|width=525,height=328!_ > The partition key will be wrong when use Flink dialect to create Hive table > --- > > Key: FLINK-32596 > URL: https://issues.apache.org/jira/browse/FLINK-32596 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: luoyuxia >Assignee: Vallari Rastogi >Priority: Major > Attachments: image-2024-02-14-16-06-13-126.png, > image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png, > image-2024-02-15-03-08-50-029.png > > > Can be reproduced by the following SQL: > > {code:java} > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql( > "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by > (`date`)" > + " with ('connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file')"); > CatalogTable catalogTable = > (CatalogTable) > hiveCatalog.getTable(ObjectPath.fromString("default.t1")); > // the following assertion will fail > assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table
[ https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vallari Rastogi updated FLINK-32596: Attachment: image-2024-02-15-03-08-50-029.png > The partition key will be wrong when use Flink dialect to create Hive table > --- > > Key: FLINK-32596 > URL: https://issues.apache.org/jira/browse/FLINK-32596 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: luoyuxia >Assignee: Vallari Rastogi >Priority: Major > Attachments: image-2024-02-14-16-06-13-126.png, > image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png, > image-2024-02-15-03-08-50-029.png > > > Can be reproduced by the following SQL: > > {code:java} > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql( > "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by > (`date`)" > + " with ('connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file')"); > CatalogTable catalogTable = > (CatalogTable) > hiveCatalog.getTable(ObjectPath.fromString("default.t1")); > // the following assertion will fail > assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table
[ https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vallari Rastogi updated FLINK-32596: Attachment: image-2024-02-15-03-06-28-175.png > The partition key will be wrong when use Flink dialect to create Hive table > --- > > Key: FLINK-32596 > URL: https://issues.apache.org/jira/browse/FLINK-32596 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: luoyuxia >Assignee: Vallari Rastogi >Priority: Major > Attachments: image-2024-02-14-16-06-13-126.png, > image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png > > > Can be reproduced by the following SQL: > > {code:java} > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql( > "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by > (`date`)" > + " with ('connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file')"); > CatalogTable catalogTable = > (CatalogTable) > hiveCatalog.getTable(ObjectPath.fromString("default.t1")); > // the following assertion will fail > assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table
[ https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vallari Rastogi updated FLINK-32596: Attachment: image-2024-02-15-03-05-22-541.png > The partition key will be wrong when use Flink dialect to create Hive table > --- > > Key: FLINK-32596 > URL: https://issues.apache.org/jira/browse/FLINK-32596 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: luoyuxia >Assignee: Vallari Rastogi >Priority: Major > Attachments: image-2024-02-14-16-06-13-126.png, > image-2024-02-15-03-05-22-541.png > > > Can be reproduced by the following SQL: > > {code:java} > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql( > "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by > (`date`)" > + " with ('connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file')"); > CatalogTable catalogTable = > (CatalogTable) > hiveCatalog.getTable(ObjectPath.fromString("default.t1")); > // the following assertion will fail > assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
gyfora commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489957782 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( Review Comment: 5 minutes seems reasonable for prod envs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
gyfora commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489957348 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( Review Comment: If no trouble for you @mateczagany let's add the default timeout increase here. I am not a big fan of too many PRs it's just more work to review :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34445) Integrate new endpoint with Flink UI metrics section
[ https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mason Chen updated FLINK-34445: --- Component/s: Runtime / Web Frontend > Integrate new endpoint with Flink UI metrics section > > > Key: FLINK-34445 > URL: https://issues.apache.org/jira/browse/FLINK-34445 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Mason Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34445) Integrate new endpoint with Flink UI metrics section
Mason Chen created FLINK-34445: -- Summary: Integrate new endpoint with Flink UI metrics section Key: FLINK-34445 URL: https://issues.apache.org/jira/browse/FLINK-34445 Project: Flink Issue Type: Sub-task Reporter: Mason Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34444) Add new endpoint handler to flink
[ https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mason Chen updated FLINK-3: --- Component/s: Runtime / Metrics Runtime / REST > Add new endpoint handler to flink > - > > Key: FLINK-3 > URL: https://issues.apache.org/jira/browse/FLINK-3 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics, Runtime / REST >Reporter: Mason Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34444) Add new endpoint handler to flink
Mason Chen created FLINK-3: -- Summary: Add new endpoint handler to flink Key: FLINK-3 URL: https://issues.apache.org/jira/browse/FLINK-3 Project: Flink Issue Type: Sub-task Reporter: Mason Chen -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
mateczagany commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489946709 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( Review Comment: I don't think there could be any downside to setting the default cleanup timeout to e.g. 5 minutes. @gyfora WDYT? Should that be done in another PR or should I add that here, or do you think we should leave the timeout as is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]
flinkbot commented on PR #24313: URL: https://github.com/apache/flink/pull/24313#issuecomment-1944401398 ## CI report: * 487d27788e16263ad76fcb5ce6d5ca4a1a1015d2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]
GaoZG2 opened a new pull request, #24313: URL: https://github.com/apache/flink/pull/24313 [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817467#comment-17817467 ] Jacek Wislicki commented on FLINK-34436: Added also a simpler example (without enumerations that seem to be the key problem): https://github.com/JacekWislicki/test12 > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]
mxm commented on code in PR #762: URL: https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1489841998 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold."); +public static final ConfigOption MEMORY_TUNING_ENABLED = +autoScalerConfig("memory.tuning.enabled") +.booleanType() +.defaultValue(false) + .withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled")) +.withDescription( +"If enabled, the initial amount of memory specified for TaskManagers will be reduced according to the observed needs."); + +public static final ConfigOption MEMORY_TUNING_HEAP_TARGET = +autoScalerConfig("memory.tuning.heap.target-usage") +.enumType(MemoryTuning.HeapTuningTarget.class) +.defaultValue(MemoryTuning.HeapTuningTarget.AVG) Review Comment: Updated to use MAX as the default. Also added extra overhead which is pretty important for the memory to be able to grow once reduced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Extend error labeling to void restart loops. [flink]
flinkbot commented on PR #24312: URL: https://github.com/apache/flink/pull/24312#issuecomment-1944295795 ## CI report: * 2451d70d20c20e6c1cc386640b3235652d227d93 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-1944280510 @hlteoh37 @z3d1k please have a look, I pushes the changes (in a single squashed commit). I implemented all the requested changes except those I explicitly commented not to implement. Happy to discuss -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location [flink]
XComp commented on PR #24244: URL: https://github.com/apache/flink/pull/24244#issuecomment-1944268824 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
XComp commented on PR #24309: URL: https://github.com/apache/flink/pull/24309#issuecomment-1944265004 I updated the PR but will wait till tomorrow with switching it from draft back to reviewable state. I want to wait for CI to pass before it makes sense to review the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now `ARRAY_SORT(,[, [, ]])`, where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now ARRAY_SORT(, [, [, ]]), where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: The updated function signature is now ARRAY_SORT([, [, ]]), where both sort_ascending and nulls_first parameters are optional, allowing for more flexibility in array sorting behavior. @MartijnVisser @dawidwys -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34403. --- Resolution: Fixed * master ** [2298e53f35121602c56845ac8040439fbd1a9ff4|https://github.com/apache/flink/commit/2298e53f35121602c56845ac8040439fbd1a9ff4] ** [9a316a5bcc47da7f69e76e0c25ed257adc4298ce|https://github.com/apache/flink/commit/9a316a5bcc47da7f69e76e0c25ed257adc4298ce] > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37) > Feb 07 05:43:21 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Self-suppression not permitted > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323) > Feb 07 05:43:21 ... 18 more > Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: > Self-suppression not permitted > Feb 07 05:43:21 at > java.lang.Throwable.addSuppressed(Throwable.java:1072) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.writeObjectToConfi
Re: [PR] [FLINK-34403][ci] Transforms VeryBigPbRowToProtoTest into an integration test [flink]
XComp merged PR #24311: URL: https://github.com/apache/flink/pull/24311 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489791182 ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.util.BinaryUtils; +import org.apache.commons.lang3.StringUtils; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; + +/** Sign a Remote-Write request to Amazon Managed Service for Prometheus (AMP). */ +public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequestSigner { + +private final URL remoteWriteUrl; +private final String awsRegion; + +/** + * Constructor. + * + * @param remoteWriteUrl URL of the remote-write endpoint + * @param awsRegion Region of the AMP workspace + */ +public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) { +Preconditions.checkArgument( +StringUtils.isNotBlank(awsRegion), "Missing or blank AMP workspace region"); +Preconditions.checkNotNull( +StringUtils.isNotBlank(remoteWriteUrl), +"Missing or blank AMP workspace remote-write URL"); +this.awsRegion = awsRegion; +try { +this.remoteWriteUrl = new URL(remoteWriteUrl); +} catch (MalformedURLException e) { +throw new IllegalArgumentException( +"Invalid AMP remote-write URL: " + remoteWriteUrl, e); +} +} + +/** + * Add the additional Http request headers required by Amazon Managed Prometheus: + * 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' and 'Authorization`. + * + * @param requestHeaders original Http request headers. It must be mutable. For efficiency, any + * new header is added to the map, instead of making a copy. + * @param requestBody request body, already compressed + */ +@Override +public void addSignatureHeaders(Map requestHeaders, byte[] requestBody) { +byte[] contentHash = AWS4SignerBase.hash(requestBody); +String contentHashString = BinaryUtils.toHex(contentHash); +requestHeaders.put( +"x-amz-content-sha256", +contentHashString); // this header must be included before generating the +// Authorization header + +DefaultAWSCredentialsProviderChain credsChain = new DefaultAWSCredentialsProviderChain(); Review Comment: Actually, I reverted the change. Just passing a credentials provider instance obviously doesn't work, not being serializable. Making the credentials provider customisable would mean either: 1. Copying the implementation of `org.apache.flink.connector.aws.util.AWSGeneralUtil` from `flink-connector-aws-base` and using it to create the credential provider at runtime, based on configuration, or 2. Including `flink-connector-aws-base` as a dependency. But this would bring in a number of transitive dependencies that are not used. The only AWS SDK dependencies in this connector are in this signer, and only to get the credentials. Also, `flink-connector-aws-base` uses sdk2, while this signer uses sdk1. We discussed with @hlteoh37 that porting to sdk2 is nice to have but not a priority for the first release. Considering `DefaultAWSCredentialsProviderChain` will already cover almost every possible authentication scenario when a Flink application will be writing to AMP, I would not consider configurable credentials provider a priority for the first release, and wait if any actual user will need it. Happy to discuss. -- This is an automated message from the Apache Git Service. To respond to the message,
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
numbnut commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489724962 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( Review Comment: I fully agree, but don't have a solution. At least the default value for the cleanup timeout could be increased to a higher value. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
XComp commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1489646969 ## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java: ## @@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class taskClazz) { public void triggerPeriodicScheduledTasks() { for (ScheduledTask scheduledTask : periodicScheduledTasks) { if (!scheduledTask.isCancelled()) { -scheduledTask.execute(); +executeScheduledTask(scheduledTask); } } } +private static void executeScheduledTask(ScheduledTask scheduledTask) { +scheduledTask.execute(); +try { +// try to retrieve result of scheduled task to avoid swallowing any exceptions that +// occurred +scheduledTask.get(); Review Comment: True, I missed that we're not retrieving the result in case of periodic tasks in [ScheduledTask#execute](https://github.com/apache/flink/blob/5405239dec0884dff746129c73955c90f455c465/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java#L76). I changed the behavior: Instead of collecting the error in the future, we're going to throw the error rightaway: > ScheduledTask only serves as a container for a Callable. The error handling should be done by an Executor (e.g. the main thread). Therefore, explicitly handling errors inside #execute is out of scope for ScheduledTask. ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ## @@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() { if (requirementsCheckDelay.toMillis() <= 0) { checkResourceRequirements(); } else { -if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) { -requirementsCheckFuture = new CompletableFuture<>(); -scheduledExecutor.schedule( -() -> -mainThreadExecutor.execute( -() -> { -checkResourceRequirements(); - Preconditions.checkNotNull(requirementsCheckFuture) -.complete(null); -}), -requirementsCheckDelay.toMillis(), -TimeUnit.MILLISECONDS); +if (requirementsCheckFuture.isDone()) { +requirementsCheckFuture = +scheduledExecutor.schedule( +() -> { +if (started) { Review Comment: yeah, I had to revisit the PR. I looked into the synchronization of `started` yesterday and was kind of puzzled why we haven't had to synchronize the field till now. But that's due to the fact that all methods that rely on the `started` field are actually called from within the ResourceManager's main thread. In the end, I forgot to consider this when actually accessing the `SlotManager`'s state from another thread :facepalm: Anyway, I reiterated over the PR and added some assertions to make it clearer that the `FineGrainedSlotManager` should be handled from within the `ResourceManager`'s main thread. Please clarify if I came up with the wrong conclusion and missed something here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
mxm commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1489593700 ## flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java: ## @@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key + "export the password using this environment variable.", code(STATE_STORE_TYPE.key()), code(JDBC.toString())) .build()); + +public static final ConfigOption EVENT_HANDLER_TYPE = +autoscalerStandaloneConfig("event-handler.type") +.enumType(EventHandlerType.class) +.defaultValue(EventHandlerType.LOGGING) +.withDescription("The autoscaler event handler type."); Review Comment: What if users want to configure both logging and storing via JDBC? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
mateczagany commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489494523 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( +"Failed to shut down cluster {} (deployment {}) in {} seconds, proceeding...", +clusterId, +deploymentName, +shutdownTimeout); +return; } +} +} -if (serviceRunning) { -Service service = -kubernetesClient -.services() -.inNamespace(namespace) -.withName( - ExternalServiceDecorator.getExternalServiceName(clusterId)) -.get(); -if (service == null) { -serviceRunning = false; -} -} +/** Wait until Deployment is removed, return false if timed out, otherwise return true. */ +@VisibleForTesting +boolean waitForDeploymentToBeRemoved(String namespace, String deploymentName, long timeout) { +ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor(); +logger.scheduleWithFixedDelay( +() -> LOG.info("Waiting for Deployment {} to shut down...", deploymentName), +5, +5, +TimeUnit.SECONDS); Review Comment: You are right, I will remove it. I don't think it's worth to keep those logs, since it's easy to figure out what we're waiting for if we have before/after logs as in your second suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]
gyfora commented on code in PR #762: URL: https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1489489638 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold."); +public static final ConfigOption MEMORY_TUNING_ENABLED = +autoScalerConfig("memory.tuning.enabled") +.booleanType() +.defaultValue(false) + .withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled")) +.withDescription( +"If enabled, the initial amount of memory specified for TaskManagers will be reduced according to the observed needs."); + +public static final ConfigOption MEMORY_TUNING_HEAP_TARGET = +autoScalerConfig("memory.tuning.heap.target-usage") Review Comment: This config key sounds similar to target utilisation but works a completely different way, but I don't really have a better suggestion :) ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold."); +public static final ConfigOption MEMORY_TUNING_ENABLED = +autoScalerConfig("memory.tuning.enabled") +.booleanType() +.defaultValue(false) + .withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled")) +.withDescription( +"If enabled, the initial amount of memory specified for TaskManagers will be reduced according to the observed needs."); + +public static final ConfigOption MEMORY_TUNING_HEAP_TARGET = +autoScalerConfig("memory.tuning.heap.target-usage") +.enumType(MemoryTuning.HeapTuningTarget.class) +.defaultValue(MemoryTuning.HeapTuningTarget.AVG) Review Comment: If Max is more conservative should that be the default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]
gyfora commented on code in PR #765: URL: https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1489463353 ## docs/content/docs/custom-resource/autoscaler.md: ## @@ -286,16 +286,20 @@ please download JDBC driver and initialize database and table first. ``` JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar -# export the password of jdbc state store +# export the password of jdbc state store & jdbc event handler export STATE_STORE_JDBC_PWD=123456 +export EVENT_HANDLER_JDBC_PWD=123456 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \ --autoscaler.standalone.fetcher.flink-cluster.host localhost \ --autoscaler.standalone.fetcher.flink-cluster.port 8081 \ --autoscaler.standalone.state-store.type jdbc \ --autoscaler.standalone.state-store.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ ---autoscaler.standalone.state-store.jdbc.username root +--autoscaler.standalone.state-store.jdbc.username root \ +--autoscaler.standalone.event-handler.type jdbc \ +--autoscaler.standalone.event-handler.jdbc.url jdbc:mysql://localhost:3306/flink_autoscaler \ +--autoscaler.standalone.event-handler.jdbc.username root Review Comment: @1996fanrui I agree that we should probably merge the configs as they will be used together. Instead of calling it `jdbc-plugin` we could just call the properties: ``` autoscaler.standalone.jdbc.username autoscaler.standalone.jdbc.url ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-34439) Move chown operations to COPY commands in Dockerfile
[ https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-34439. -- Fix Version/s: kubernetes-operator-1.8.0 Resolution: Fixed merged to main 6cab745df9d0a742ab15f341bdf69c8e802b2a39 > Move chown operations to COPY commands in Dockerfile > > > Key: FLINK-34439 > URL: https://issues.apache.org/jira/browse/FLINK-34439 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Mate Czagany >Assignee: Mate Czagany >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > We can lower the size of the output operator container image if we don't run > 'chown' commands in seperate RUN commands inside the Dockerfile, but instead > use the '--chown' argument of the COPY command. > Using 'RUN chown...' will copy all the files affected with their whole size > to a new layer, duplicating the previous files from the COPY command. > Example: > {code:java} > $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8 > ... > 3 months ago RUN /bin/sh -c chown -R flink:flink $FLINK... > 116MB buildkit.dockerfile.v0 > ... {code} > This would mean a 20% reduction in image size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34439] Move chown operations to COPY commands in Dockerfile [flink-kubernetes-operator]
gyfora merged PR #775: URL: https://github.com/apache/flink-kubernetes-operator/pull/775 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
MartijnVisser commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: This is quite a rabbit hole 😆 1. Databricks has two functions: * sort_array https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - Sorts ascending, puts NULL at beginning. * array_sort https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - Sorts ascending, puts NULL at the end. 2. Spark has https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is indeed not different from the Databricks one. Sorts ascending, puts NULL at beginning 3. Snowflake indeed has https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts ascending, puts NULL at ~~the end~~ beginning. 4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 5. StarRocks also has this function https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/ - Sorts ascending, puts NULL at beginning 6. Doris has it https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/ - Sorts ascending, puts NULL at beginning 7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts NULL at beginning. I do think that the Snowflake has the cleanest function signature: there's just one function, and it has two optional arguments to give all the flexibility. I think the default sorting mode should be ascending, and then it boils down to decide on the default location on where to put NULL. Out of the 7 implementations, we have 6 who put NULL at the beginning, and only Databricks puts them at the end. Naturally, I think we should put NULL at the beginning but I'm +0 on the default location of NULL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]
gyfora commented on code in PR #777: URL: https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489430330 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws } } -/** Wait until the FLink cluster has completely shut down. */ -@VisibleForTesting -void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { -LOG.info("Waiting for cluster shutdown..."); - -boolean jobManagerRunning = true; -boolean taskManagerRunning = true; -boolean serviceRunning = true; +/** Returns a list of Kubernetes Deployment names for given cluster. */ +protected abstract List getDeploymentNames(String namespace, String clusterId); -for (int i = 0; i < shutdownTimeout; i++) { -if (jobManagerRunning) { -PodList jmPodList = getJmPodList(namespace, clusterId); +/** Wait until the FLink cluster has completely shut down. */ +protected void waitForClusterShutdown( +String namespace, String clusterId, long shutdownTimeout) { +long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; +LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); -if (jmPodList == null || jmPodList.getItems().isEmpty()) { -jobManagerRunning = false; -} -} -if (taskManagerRunning) { -PodList tmPodList = getTmPodList(namespace, clusterId); +for (var deploymentName : getDeploymentNames(namespace, clusterId)) { +long deploymentTimeout = timeoutAt - System.currentTimeMillis(); -if (tmPodList.getItems().isEmpty()) { -taskManagerRunning = false; -} +if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { +LOG.error( +"Failed to shut down cluster {} (deployment {}) in {} seconds, proceeding...", +clusterId, +deploymentName, +shutdownTimeout); +return; } +} +} -if (serviceRunning) { -Service service = -kubernetesClient -.services() -.inNamespace(namespace) -.withName( - ExternalServiceDecorator.getExternalServiceName(clusterId)) -.get(); -if (service == null) { -serviceRunning = false; -} -} +/** Wait until Deployment is removed, return false if timed out, otherwise return true. */ +@VisibleForTesting +boolean waitForDeploymentToBeRemoved(String namespace, String deploymentName, long timeout) { +ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor(); +logger.scheduleWithFixedDelay( +() -> LOG.info("Waiting for Deployment {} to shut down...", deploymentName), +5, +5, +TimeUnit.SECONDS); Review Comment: This looks a bit strange / heavy to create always a new executor (thread), can we have a shared pool / or hook into the kubernetes client `waitUntilCondition` somehow? Or alternatively we could just remove the periodic logs and log before and after -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489408092 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ## @@ -43,49 +44,61 @@ public static void computeLoadMetrics( JobVertexID jobVertexID, Map flinkMetrics, Map scalingMetrics, +IOMetrics ioMetrics, Configuration conf) { -double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID); -scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000); +scalingMetrics.put( +ScalingMetric.LOAD, +getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 1000.); +scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, ioMetrics.getAccumulatedBusyTime()); +} + +private static double getBusyTimeMsPerSecond( +Map flinkMetrics, +Configuration conf, +JobVertexID jobVertexId) { +var busyTimeAggregator = conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR); +var busyTimeMsPerSecond = + busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC)); +if (!Double.isFinite(busyTimeMsPerSecond)) { +if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) { +// We only want to log this once +LOG.warn( +"No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", +jobVertexId); +} +return Double.NaN; +} +return Math.max(0, busyTimeMsPerSecond); } public static void computeDataRateMetrics( JobVertexID jobVertexID, Map flinkMetrics, Map scalingMetrics, JobTopology topology, -double lagGrowthRate, Configuration conf, Supplier observedTprAvg) { var isSource = topology.isSource(jobVertexID); +var ioMetrics = topology.get(jobVertexID).getIoMetrics(); double numRecordsInPerSecond = -getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource); +getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, isSource, true); Review Comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489403136 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, +null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), -finished.contains(vertexId))); +maxParallelismMap.get(vertexId), Review Comment: I will add an explicit constructor to clean this up instead of relying on Lombok allArgsConstructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489401020 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -356,13 +421,13 @@ public static double getAverage( ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics(); double num = metrics.getOrDefault(metric, Double.NaN); -if (Double.isNaN(num)) { -continue; -} if (Double.isInfinite(num)) { anyInfinite = true; continue; } +if (Double.isNaN(num)) { +continue; +} Review Comment: I will revert this, I made some changes that I later removed, not necessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399611 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -264,16 +299,14 @@ private void computeTargetDataRate( if (topology.isSource(vertex)) { double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); -if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) { +double lagRate = getRate(LAG, vertex, metricsHistory); +double sourceDataRate = Math.max(0, inputRate + lagRate); Review Comment: Make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399318 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -341,6 +375,37 @@ public static double getAverage( return getAverage(metric, jobVertexId, metricsHistory, 1); } +public static double getRate( +ScalingMetric metric, +@Nullable JobVertexID jobVertexId, +SortedMap metricsHistory) { + +Instant firstTs = null; +double first = Double.NaN; + +Instant lastTs = null; +double last = Double.NaN; + +for (var entry : metricsHistory.entrySet()) { +double value = entry.getValue().getVertexMetrics().get(jobVertexId).get(metric); +if (!Double.isNaN(value)) { +if (Double.isNaN(first)) { +first = value; +firstTs = entry.getKey(); +} else { +last = value; +lastTs = entry.getKey(); +} +} +} +if (Double.isNaN(last)) { +return Double.NaN; +} + +double diff = last - first; +return diff == 0 ? 0 : diff / Duration.between(firstTs, lastTs).toSeconds(); Review Comment: I will this, I still had to change the logic so that we compute the seconds rate using the millisecond diff otherwise we can accidentally divide by zero -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467 ## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java: ## @@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class taskClazz) { public void triggerPeriodicScheduledTasks() { for (ScheduledTask scheduledTask : periodicScheduledTasks) { if (!scheduledTask.isCancelled()) { -scheduledTask.execute(); +executeScheduledTask(scheduledTask); } } } +private static void executeScheduledTask(ScheduledTask scheduledTask) { +scheduledTask.execute(); +try { +// try to retrieve result of scheduled task to avoid swallowing any exceptions that +// occurred +scheduledTask.get(); Review Comment: This doesn't work for periodic tasks since the result future in the ScheduledTask never gets completed. I'm not sure if this change is correct in the first place. Every non-periodic task already throws exceptions when trigger is called, and you can't wait for non-periodic tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
MartijnVisser commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489343962 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: Sounds good to me, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489331336 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: @MartijnVisser Actually Snowflake does: > nulls_first Default: FALSE if the ARRAY is sorted in ascending order; TRUE if the ARRAY is sorted in descending order. I'd suggest we: 1. adapt Snowflake's signature where we get `ARRAY, BOOLEAN, BOOLEAN`, output is `ARRAY` which is `NULLABLE` if any of the input is nullable 2. We use defaults: ascending and `nulls_first` for ASC, `nulls_last` for DESC 3. If any of the input is null the output is also null How does this sound? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]
mxm commented on code in PR #776: URL: https://github.com/apache/flink-kubernetes-operator/pull/776#discussion_r1489321926 ## docs/content/docs/custom-resource/overview.md: ## @@ -216,6 +216,38 @@ Alternatively, if you use helm to install flink-kubernetes-operator, it allows y - Last-state upgradeMode is currently not supported for FlinkSessionJobs +## Flink SQL Jobs Review Comment: I think we should avoid copying anything over from https://github.com/apache/flink-kubernetes-operator/tree/main/examples/. Instead, we can extract the above linked examples section to a dedicated page and add any extra information we find useful. ## docs/content/docs/custom-resource/overview.md: ## @@ -216,6 +216,38 @@ Alternatively, if you use helm to install flink-kubernetes-operator, it allows y - Last-state upgradeMode is currently not supported for FlinkSessionJobs +## Flink SQL Jobs Review Comment: Taking another look, this might not be the best place to add this information because it's an overview page and the steps are quite detailed. There is already a section mentioning the examples: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#built-in-examples How about creating a dedicated Examples page? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
MartijnVisser commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: This is quite a rabbit hole 😆 1. Databricks has two functions: * sort_array https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - Sorts ascending, puts NULL at beginning. * array_sort https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - Sorts ascending, puts NULL at the end. 2. Spark has https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is indeed not different from the Databricks one. Sorts ascending, puts NULL at beginning 3. Snowflake indeed has https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts ascending, puts NULL at the end. 4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 5. StarRocks also has this function https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/ - Sorts ascending, puts NULL at beginning 6. Doris has it https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/ - Sorts ascending, puts NULL at beginning 7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts NULL at beginning. I do think that the Snowflake has the cleanest function signature: there's just one function, and it has two optional arguments to give all the flexibility. I think the default sorting mode should be ascending, and then it boils down to decide on the default location on where to put NULL. Out of the 7 implementations, we have 5 who put NULL at the beginning, and 2 who put them at the end. Naturally, I think we should put NULL at the beginning but I'm +0 on the default location of NULL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:12 AM: - Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:12 AM: - Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21
[jira] [Comment Edited] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
[ https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298 ] Matthias Pohl edited comment on FLINK-34336 at 2/14/24 11:11 AM: - * https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193 * https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=15356 was (Author: mapohl): * https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193 * https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190 > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang sometimes > - > > Key: FLINK-34336 > URL: https://issues.apache.org/jira/browse/FLINK-34336 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.19.0, 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang in > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} > h2. Reason: > The job has 2 tasks(vertices), after calling updateJobResourceRequirements. > The source parallelism isn't changed (It's parallelism) , and the > FlatMapper+Sink is changed from parallelism to parallelism2. > So we expect the task number should be parallelism + parallelism2 instead of > parallelism2. > > h2. Why it can be passed for now? > Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by > default. It means, flink job will rescale job 30 seconds after > updateJobResourceRequirements is called. > > So the running tasks are old parallelism when we call > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} > IIUC, it cannot be guaranteed, and it's unexpected. > > h2. How to reproduce this bug? > [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6] > * Disable the cooldown > * Sleep for a while before waitForRunningTasks > If so, the job running in new parallelism, so `waitForRunningTasks` will hang > forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:11 AM: - Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:10 AM: - Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:10 AM: - Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHe
[jira] [Commented] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817334#comment-17817334 ] Matthias Pohl commented on FLINK-34273: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57521&view=logs&j=60960eae-6f09-579e-371e-29814bdd1adc&t=1fe608a4-e773-5ca0-5336-1c37a61b9f8d > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
[ https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-22765: -- Priority: Critical (was: Major) > ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable > > > Key: FLINK-22765 > URL: https://issues.apache.org/jira/browse/FLINK-22765 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Fix For: 1.14.0, 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56 > {code} > May 25 00:56:38 java.lang.AssertionError: > May 25 00:56:38 > May 25 00:56:38 Expected: is "" > May 25 00:56:38 but: was "The system is out of resources.\nConsult the > following stack trace for details." > May 25 00:56:38 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:956) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:923) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70) > May 25 00:56:38 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > May 25 00:56:38 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > May 25 00:56:38 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > May 25 00:56:38 at java.lang.reflect.Method.invoke(Method.java:498) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > May 25 00:56:38 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > May 25 00:56:38 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > May 25 00:56:38 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > May 25 00:56:38 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > May 25 00:56:38 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > May 25 00:56:38 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > May 25 00:56:38 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > May 25 00:56:38 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > May 25 00:56:38 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) > May 25 00:56:38 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
[ https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817333#comment-17817333 ] Matthias Pohl commented on FLINK-22765: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57521&view=logs&j=a657ddbf-d986-5381-9649-342d9c92e7fb&t=dc085d4a-05c8-580e-06ab-21f5624dab16&l=8997 > ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable > > > Key: FLINK-22765 > URL: https://issues.apache.org/jira/browse/FLINK-22765 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Labels: pull-request-available, stale-assigned, test-stability > Fix For: 1.14.0, 1.16.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56 > {code} > May 25 00:56:38 java.lang.AssertionError: > May 25 00:56:38 > May 25 00:56:38 Expected: is "" > May 25 00:56:38 but: was "The system is out of resources.\nConsult the > following stack trace for details." > May 25 00:56:38 at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:956) > May 25 00:56:38 at org.junit.Assert.assertThat(Assert.java:923) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94) > May 25 00:56:38 at > org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70) > May 25 00:56:38 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > May 25 00:56:38 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > May 25 00:56:38 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > May 25 00:56:38 at java.lang.reflect.Method.invoke(Method.java:498) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > May 25 00:56:38 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > May 25 00:56:38 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > May 25 00:56:38 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > May 25 00:56:38 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > May 25 00:56:38 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > May 25 00:56:38 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > May 25 00:56:38 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > May 25 00:56:38 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > May 25 00:56:38 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > May 25 00:56:38 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > May 25 00:56:38 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > May 25 00:56:38 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > May 25 00:56:38 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > May 25 00:56:38 at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) > May 25 00:56:38 at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) > May 25 00:56:38 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) > May 25 00:56:38 at > org.apache.maven.surefir
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: > Ok, thank you, actually, I did this function last summer and follow the jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description. I understand that. If at some point we find out we either made mistake or did not put enough effort into something it's better to fix that sooner rather than later when we need to live with the consequences. I admit I have not thoroughly checked the semantics before which I should've. It's better to do something well rather than fast in my opinion. I see, so traditional RDBMS do not really support that function. It's also worth checking what does: * Snowflake: https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` when any argument is `null`), null handling separately * Spark: https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: from the docs it does not say what's the behaviour on `null` `ascendingOrder`, nulls first on asc, nulls last on desc * Presto: https://prestodb.io/docs/current/functions/array.html: has two separate functions for `ASC/DESC` To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489284689 ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerBase.java: ## @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import com.amazonaws.util.BinaryUtils; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; + +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLEncoder; +import java.security.MessageDigest; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SimpleTimeZone; +import java.util.SortedMap; +import java.util.TreeMap; + +/** Common methods and properties for all AWS4 signer variants. */ +public abstract class AWS4SignerBase { + +/** SHA256 hash of an empty request body. */ +public static final String EMPTY_BODY_SHA256 = +"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +public static final String UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD"; + +public static final String SCHEME = "AWS4"; +public static final String ALGORITHM = "HMAC-SHA256"; +public static final String TERMINATOR = "aws4_request"; + +/** format strings for the date/time and date stamps required during signing. */ +public static final String ISO_8601_BASIC_FORMAT = "MMdd'T'HHmmss'Z'"; + +public static final String DATE_STRING_FORMAT = "MMdd"; + +protected URL endpointUrl; +protected String httpMethod; +protected String serviceName; +protected String regionName; + +protected final SimpleDateFormat dateTimeFormat; +protected final SimpleDateFormat dateStampFormat; + +/** + * Create a new AWS V4 signer. + * + * @param endpointUrl The service endpoint, including the path to any resource. + * @param httpMethod The HTTP verb for the request, e.g. GET. + * @param serviceName The signing name of the service, e.g. 's3'. + * @param regionName The system name of the AWS region associated with the endpoint, e.g. + * us-east-1. + */ +public AWS4SignerBase( +URL endpointUrl, String httpMethod, String serviceName, String regionName) { +this.endpointUrl = endpointUrl; +this.httpMethod = httpMethod; +this.serviceName = serviceName; +this.regionName = regionName; + +dateTimeFormat = new SimpleDateFormat(ISO_8601_BASIC_FORMAT); +dateTimeFormat.setTimeZone(new SimpleTimeZone(0, "UTC")); +dateStampFormat = new SimpleDateFormat(DATE_STRING_FORMAT); +dateStampFormat.setTimeZone(new SimpleTimeZone(0, "UTC")); +} + +/** + * Returns the canonical collection of header names that will be included in the signature. For + * AWS4, all header names must be included in the process in sorted canonicalized order. + */ +protected static String getCanonicalizeHeaderNames(Map headers) { +List sortedHeaders = new ArrayList(); +sortedHeaders.addAll(headers.keySet()); +Collections.sort(sortedHeaders, String.CASE_INSENSITIVE_ORDER); + +StringBuilder buffer = new StringBuilder(); +for (String header : sortedHeaders) { +if (buffer.length() > 0) { +buffer.append(";"); +} +buffer.append(header.toLowerCase()); +} + +return buffer.toString(); +} + +/** + * Computes the canonical headers with values for the request. For AWS4, all headers must be + * included in the signing process. + */ +protected static String getCanonicalizedHeaderString(Map headers) { +if (headers == null || headers.isEmpty()) { +return ""; +} + +// step1: sort the headers by case-insensitive order +List sortedHeaders = new ArrayList(); +sortedHeaders.addAll(headers.keySet()); +Collections.sort(sortedHeaders, String.CASE_INSENSITIVE_ORDER); + +// step2: form the
[jira] [Closed] (FLINK-34432) Re-enable forkReuse for flink-table-planner
[ https://issues.apache.org/jira/browse/FLINK-34432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-34432. -- Fix Version/s: 1.20.0 Resolution: Fixed Fixed in apache/flink:master 403694e7b9c213386f3ed9cff21ce2664030ebc2 > Re-enable forkReuse for flink-table-planner > --- > > Key: FLINK-34432 > URL: https://issues.apache.org/jira/browse/FLINK-34432 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Client, Test Infrastructure, Tests >Affects Versions: 1.19.0, 1.18.2, 1.20.0 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > With FLINK-18356 resolved, we should re-enable forkReuse for > flink-table-planner to speed up the tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489268899 ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.util.BinaryUtils; +import org.apache.commons.lang3.StringUtils; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Map; + +/** Sign a Remote-Write request to Amazon Managed Service for Prometheus (AMP). */ +public class AmazonManagedPrometheusWriteRequestSigner implements PrometheusRequestSigner { + +private final URL remoteWriteUrl; +private final String awsRegion; + +/** + * Constructor. + * + * @param remoteWriteUrl URL of the remote-write endpoint + * @param awsRegion Region of the AMP workspace + */ +public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, String awsRegion) { +Preconditions.checkArgument( +StringUtils.isNotBlank(awsRegion), "Missing or blank AMP workspace region"); +Preconditions.checkNotNull( +StringUtils.isNotBlank(remoteWriteUrl), +"Missing or blank AMP workspace remote-write URL"); +this.awsRegion = awsRegion; +try { +this.remoteWriteUrl = new URL(remoteWriteUrl); +} catch (MalformedURLException e) { +throw new IllegalArgumentException( +"Invalid AMP remote-write URL: " + remoteWriteUrl, e); +} +} + +/** + * Add the additional Http request headers required by Amazon Managed Prometheus: + * 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' and 'Authorization`. + * + * @param requestHeaders original Http request headers. It must be mutable. For efficiency, any + * new header is added to the map, instead of making a copy. + * @param requestBody request body, already compressed + */ +@Override +public void addSignatureHeaders(Map requestHeaders, byte[] requestBody) { +byte[] contentHash = AWS4SignerBase.hash(requestBody); +String contentHashString = BinaryUtils.toHex(contentHash); +requestHeaders.put( +"x-amz-content-sha256", +contentHashString); // this header must be included before generating the +// Authorization header + +DefaultAWSCredentialsProviderChain credsChain = new DefaultAWSCredentialsProviderChain(); Review Comment: Added the possibility to instantiate a signer with a different credentials provider. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489262074 ## flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java: ## @@ -45,11 +45,13 @@ public MainThreadValidatorUtil(RpcEndpoint endpoint) { public void enterMainThread() { assert (endpoint.currentMainThread.compareAndSet(null, Thread.currentThread())) : "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get(); +endpoint.beforeInvocation(); Review Comment: Yes, because the main thread executable covers less; it doesnt capture anything sent directly against the rpc service's executor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489259793 ## flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.test.appender.ListAppender; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +/** + * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. + */ +public class JobIDLoggingITCase extends TestLogger { +private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); + +@ClassRule +public static MiniClusterWithClientResource miniClusterResource = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) +.build()); + +@Test +public void testJobIDLogging() throws Exception { +LoggerContext ctx = LoggerContext.getContext(false); Review Comment: You can use multiple extensions to capture multiple loggers, see `YARNSessionCapacitySchedulerITCase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489259793 ## flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.test.appender.ListAppender; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; + +/** + * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. + */ +public class JobIDLoggingITCase extends TestLogger { +private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); + +@ClassRule +public static MiniClusterWithClientResource miniClusterResource = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) +.build()); + +@Test +public void testJobIDLogging() throws Exception { +LoggerContext ctx = LoggerContext.getContext(false); Review Comment: You can use multiple extensions to capture multiple loggers.. ## flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.stre
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489259263 ## flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java: ## @@ -34,4 +34,16 @@ public interface RpcGateway { * @return Fully qualified hostname under which the associated rpc endpoint is reachable */ String getHostname(); + +/** + * Perform optional steps before handling any messages or performing service actions (e.g. + * start/stop). + */ +default void beforeInvocation() {} Review Comment: yes, (if this is handled in the PekkoRpcActor). Although maybe not runnables but just a Map? Not sure if we _really_ want to open us up to running arbitrary things before/after just yet 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489257016 ## flink-core/src/main/java/org/apache/flink/util/JobIdLoggingExecutor.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class JobIdLoggingExecutor implements Executor { +protected final JobID jobID; Review Comment: cluster id, job name, task id, task name, ... There's a lot of stuff we could put in the MDC that'd be useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table
[ https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vallari Rastogi updated FLINK-32596: Attachment: image-2024-02-14-16-06-13-126.png > The partition key will be wrong when use Flink dialect to create Hive table > --- > > Key: FLINK-32596 > URL: https://issues.apache.org/jira/browse/FLINK-32596 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: luoyuxia >Assignee: Vallari Rastogi >Priority: Major > Attachments: image-2024-02-14-16-06-13-126.png > > > Can be reproduced by the following SQL: > > {code:java} > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql( > "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by > (`date`)" > + " with ('connector' = 'hive', > 'sink.partition-commit.delay'='1 s', > 'sink.partition-commit.policy.kind'='metastore,success-file')"); > CatalogTable catalogTable = > (CatalogTable) > hiveCatalog.getTable(ObjectPath.fromString("default.t1")); > // the following assertion will fail > assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]
zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1489255700 ## docs/content.zh/docs/deployment/advanced/logging.md: ## @@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实 +### Structured logging + +Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature): +- Job ID +- key: `flink-job-id` Review Comment: With something like this? ``` appender.console.json.eventTemplateAdditionalField[0].type = EventTemplateAdditionalField appender.console.json.eventTemplateAdditionalField[0].key = flink.job.id appender.console.json.eventTemplateAdditionalField[0].value = {"$resolver": "mdc", "key": "flink-job-id"} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Etienne Chauchot resolved FLINK-34364. -- Resolution: Fixed fixed release utils integration > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817321#comment-17817321 ] Etienne Chauchot commented on FLINK-34364: -- parent_pom: c806d46ef06c8e46d28a6a8b5db3f5104cfe53bc > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34364] Replace incorrect release_utils mount point tools/release/shared by tools/releasing/shared to match the release doc and scripts [flink-connector-shared-utils]
echauchot merged PR #36: URL: https://github.com/apache/flink-connector-shared-utils/pull/36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore
[ https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817300#comment-17817300 ] Matthias Pohl commented on FLINK-28440: --- https://github.com/apache/flink/actions/runs/7895502334/job/21548198516#step:10:7557 > EventTimeWindowCheckpointingITCase failed with restore > -- > > Key: FLINK-28440 > URL: https://issues.apache.org/jira/browse/FLINK-28440 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0, 1.17.0, 1.18.0, 1.19.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: auto-deprioritized-critical, pull-request-available, > stale-assigned, test-stability > Fix For: 1.19.0 > > Attachments: image-2023-02-01-00-51-54-506.png, > image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, > image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, > image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, > image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png > > > {code:java} > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) > ... 11 more > Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: > /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced > (No such file or directory) > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92) > at > org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more > Caused by: java.io.FileNotFoundException: > /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-03
[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs
[ https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817301#comment-17817301 ] Matthias Pohl commented on FLINK-34418: --- https://github.com/apache/flink/actions/runs/7895502334 > Disk space issues for Docker-ized GitHub Action jobs > > > Key: FLINK-34418 > URL: https://issues.apache.org/jira/browse/FLINK-34418 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: github-actions, pull-request-available, test-stability > > [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746] > {code:java} > [...] > Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device > 27608Feb 09 03:00:13 at java.io.FileOutputStream.writeBytes(Native Method) > 27609Feb 09 03:00:13 at > java.io.FileOutputStream.write(FileOutputStream.java:326) > 27610Feb 09 03:00:13 at > org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250) > 27611Feb 09 03:00:13 ... 39 more > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273 ] Matthias Pohl edited comment on FLINK-34403 at 2/14/24 9:59 AM: Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089 Reopening the issue. was (Author: mapohl): Args, all the time I didn't notice that they are two separate tests (with very similar names): * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 * https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862 Reopening the issue. > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37) > Feb 07 05:43:21 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 07 05:43:21 Caused by: java.util.concur
[jira] [Comment Edited] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
[ https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298 ] Matthias Pohl edited comment on FLINK-34336 at 2/14/24 9:59 AM: * https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193 * https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190 was (Author: mapohl): https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193 > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang sometimes > - > > Key: FLINK-34336 > URL: https://issues.apache.org/jira/browse/FLINK-34336 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.19.0, 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang in > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} > h2. Reason: > The job has 2 tasks(vertices), after calling updateJobResourceRequirements. > The source parallelism isn't changed (It's parallelism) , and the > FlatMapper+Sink is changed from parallelism to parallelism2. > So we expect the task number should be parallelism + parallelism2 instead of > parallelism2. > > h2. Why it can be passed for now? > Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by > default. It means, flink job will rescale job 30 seconds after > updateJobResourceRequirements is called. > > So the running tasks are old parallelism when we call > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} > IIUC, it cannot be guaranteed, and it's unexpected. > > h2. How to reproduce this bug? > [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6] > * Disable the cooldown > * Sleep for a while before waitForRunningTasks > If so, the job running in new parallelism, so `waitForRunningTasks` will hang > forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
[ https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34336: -- Labels: pull-request-available test-stability (was: pull-request-available) > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang sometimes > - > > Key: FLINK-34336 > URL: https://issues.apache.org/jira/browse/FLINK-34336 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.19.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang in > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} > h2. Reason: > The job has 2 tasks(vertices), after calling updateJobResourceRequirements. > The source parallelism isn't changed (It's parallelism) , and the > FlatMapper+Sink is changed from parallelism to parallelism2. > So we expect the task number should be parallelism + parallelism2 instead of > parallelism2. > > h2. Why it can be passed for now? > Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by > default. It means, flink job will rescale job 30 seconds after > updateJobResourceRequirements is called. > > So the running tasks are old parallelism when we call > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} > IIUC, it cannot be guaranteed, and it's unexpected. > > h2. How to reproduce this bug? > [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6] > * Disable the cooldown > * Sleep for a while before waitForRunningTasks > If so, the job running in new parallelism, so `waitForRunningTasks` will hang > forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
[ https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34336: -- Affects Version/s: 1.20.0 > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang sometimes > - > > Key: FLINK-34336 > URL: https://issues.apache.org/jira/browse/FLINK-34336 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.19.0, 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang in > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} > h2. Reason: > The job has 2 tasks(vertices), after calling updateJobResourceRequirements. > The source parallelism isn't changed (It's parallelism) , and the > FlatMapper+Sink is changed from parallelism to parallelism2. > So we expect the task number should be parallelism + parallelism2 instead of > parallelism2. > > h2. Why it can be passed for now? > Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by > default. It means, flink job will rescale job 30 seconds after > updateJobResourceRequirements is called. > > So the running tasks are old parallelism when we call > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} > IIUC, it cannot be guaranteed, and it's unexpected. > > h2. How to reproduce this bug? > [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6] > * Disable the cooldown > * Sleep for a while before waitForRunningTasks > If so, the job running in new parallelism, so `waitForRunningTasks` will hang > forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes
[ https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298 ] Matthias Pohl commented on FLINK-34336: --- https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193 > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang sometimes > - > > Key: FLINK-34336 > URL: https://issues.apache.org/jira/browse/FLINK-34336 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.19.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState > may hang in > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color} > h2. Reason: > The job has 2 tasks(vertices), after calling updateJobResourceRequirements. > The source parallelism isn't changed (It's parallelism) , and the > FlatMapper+Sink is changed from parallelism to parallelism2. > So we expect the task number should be parallelism + parallelism2 instead of > parallelism2. > > h2. Why it can be passed for now? > Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by > default. It means, flink job will rescale job 30 seconds after > updateJobResourceRequirements is called. > > So the running tasks are old parallelism when we call > waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, > {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color} > IIUC, it cannot be guaranteed, and it's unexpected. > > h2. How to reproduce this bug? > [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6] > * Disable the cooldown > * Sleep for a while before waitForRunningTasks > If so, the job running in new parallelism, so `waitForRunningTasks` will hang > forever. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs
[ https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817295#comment-17817295 ] Matthias Pohl commented on FLINK-34418: --- https://github.com/apache/flink/actions/runs/7895502322/job/21548178211 > Disk space issues for Docker-ized GitHub Action jobs > > > Key: FLINK-34418 > URL: https://issues.apache.org/jira/browse/FLINK-34418 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: github-actions, pull-request-available, test-stability > > [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746] > {code:java} > [...] > Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device > 27608Feb 09 03:00:13 at java.io.FileOutputStream.writeBytes(Native Method) > 27609Feb 09 03:00:13 at > java.io.FileOutputStream.write(FileOutputStream.java:326) > 27610Feb 09 03:00:13 at > org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250) > 27611Feb 09 03:00:13 ... 39 more > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs
[ https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817292#comment-17817292 ] Matthias Pohl commented on FLINK-34418: --- https://github.com/apache/flink/actions/runs/7895502206/job/21548178104 > Disk space issues for Docker-ized GitHub Action jobs > > > Key: FLINK-34418 > URL: https://issues.apache.org/jira/browse/FLINK-34418 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: github-actions, pull-request-available, test-stability > > [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746] > {code:java} > [...] > Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device > 27608Feb 09 03:00:13 at java.io.FileOutputStream.writeBytes(Native Method) > 27609Feb 09 03:00:13 at > java.io.FileOutputStream.write(FileOutputStream.java:326) > 27610Feb 09 03:00:13 at > org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250) > 27611Feb 09 03:00:13 ... 39 more > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34443) YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed when deploying job cluster
[ https://issues.apache.org/jira/browse/FLINK-34443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817293#comment-17817293 ] Matthias Pohl commented on FLINK-34443: --- Maybe related to FLINK-34418 > YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed > when deploying job cluster > --- > > Key: FLINK-34443 > URL: https://issues.apache.org/jira/browse/FLINK-34443 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Runtime / Coordination, Test > Infrastructure >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > https://github.com/apache/flink/actions/runs/7895502206/job/21548246199#step:10:28804 > {code} > Error: 03:04:05 03:04:05.066 [ERROR] Tests run: 2, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 68.10 s <<< FAILURE! -- in > org.apache.flink.yarn.YARNFileReplicationITCase > Error: 03:04:05 03:04:05.067 [ERROR] > org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication > -- Time elapsed: 1.982 s <<< ERROR! > Feb 14 03:04:05 > org.apache.flink.client.deployment.ClusterDeploymentException: Could not > deploy Yarn job cluster. > Feb 14 03:04:05 at > org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:566) > Feb 14 03:04:05 at > org.apache.flink.yarn.YARNFileReplicationITCase.deployPerJob(YARNFileReplicationITCase.java:109) > Feb 14 03:04:05 at > org.apache.flink.yarn.YARNFileReplicationITCase.lambda$testPerJobModeWithCustomizedFileReplication$0(YARNFileReplicationITCase.java:73) > Feb 14 03:04:05 at > org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) > Feb 14 03:04:05 at > org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication(YARNFileReplicationITCase.java:73) > Feb 14 03:04:05 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 14 03:04:05 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Feb 14 03:04:05 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Feb 14 03:04:05 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Feb 14 03:04:05 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Feb 14 03:04:05 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Feb 14 03:04:05 Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.IOException): File > /user/root/.flink/application_1707879779446_0002/log4j-api-2.17.1.jar could > only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) > running and 2 node(s) are excluded in this operation. > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260) > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294) > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813) > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908) > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577) > Feb 14 03:04:05 at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > Feb 14 03:04:05 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549) > Feb 14 03:04:05 at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518) > Feb 14 03:04:05 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086) > Feb 14 03:04:05 at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029) > Feb 14 03:04:05 at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957) > Feb 14 03:04:05 at java.security.AccessController.doPrivileged(Native > Method) > Feb 14 03:04:05 at javax.security.auth.Subject.doAs(Subject.java:422) > Feb 14 03:04:05 at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762) > Feb 14 03:04:05 at > org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957) > Feb 14 03:04:05 > Feb 14 03:04:05 at > org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1579) > Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.call(Client.java:1525)
[jira] [Created] (FLINK-34443) YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed when deploying job cluster
Matthias Pohl created FLINK-34443: - Summary: YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed when deploying job cluster Key: FLINK-34443 URL: https://issues.apache.org/jira/browse/FLINK-34443 Project: Flink Issue Type: Bug Components: Build System / CI, Runtime / Coordination, Test Infrastructure Affects Versions: 1.19.0, 1.20.0 Reporter: Matthias Pohl https://github.com/apache/flink/actions/runs/7895502206/job/21548246199#step:10:28804 {code} Error: 03:04:05 03:04:05.066 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 68.10 s <<< FAILURE! -- in org.apache.flink.yarn.YARNFileReplicationITCase Error: 03:04:05 03:04:05.067 [ERROR] org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication -- Time elapsed: 1.982 s <<< ERROR! Feb 14 03:04:05 org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. Feb 14 03:04:05 at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:566) Feb 14 03:04:05 at org.apache.flink.yarn.YARNFileReplicationITCase.deployPerJob(YARNFileReplicationITCase.java:109) Feb 14 03:04:05 at org.apache.flink.yarn.YARNFileReplicationITCase.lambda$testPerJobModeWithCustomizedFileReplication$0(YARNFileReplicationITCase.java:73) Feb 14 03:04:05 at org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303) Feb 14 03:04:05 at org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication(YARNFileReplicationITCase.java:73) Feb 14 03:04:05 at java.lang.reflect.Method.invoke(Method.java:498) Feb 14 03:04:05 at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) Feb 14 03:04:05 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) Feb 14 03:04:05 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) Feb 14 03:04:05 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) Feb 14 03:04:05 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Feb 14 03:04:05 Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/root/.flink/application_1707879779446_0002/log4j-api-2.17.1.jar could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation. Feb 14 03:04:05 at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260) Feb 14 03:04:05 at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294) Feb 14 03:04:05 at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813) Feb 14 03:04:05 at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908) Feb 14 03:04:05 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577) Feb 14 03:04:05 at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) Feb 14 03:04:05 at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549) Feb 14 03:04:05 at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518) Feb 14 03:04:05 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086) Feb 14 03:04:05 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029) Feb 14 03:04:05 at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957) Feb 14 03:04:05 at java.security.AccessController.doPrivileged(Native Method) Feb 14 03:04:05 at javax.security.auth.Subject.doAs(Subject.java:422) Feb 14 03:04:05 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762) Feb 14 03:04:05 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957) Feb 14 03:04:05 Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1579) Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.call(Client.java:1525) Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.call(Client.java:1422) Feb 14 03:04:05 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231) Feb 14 03:04:05 at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) Feb 14 03:04:05 at com.sun.proxy.$Proxy113.addBlock(Unknown Source) Feb 14 03:04:05 at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.ad
[jira] [Assigned] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources
[ https://issues.apache.org/jira/browse/FLINK-34442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-34442: --- Assignee: Jeyhun Karimov > Support optimizations for pre-partitioned [external] data sources > - > > Key: FLINK-34442 > URL: https://issues.apache.org/jira/browse/FLINK-34442 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Jeyhun Karimov >Assignee: Jeyhun Karimov >Priority: Major > > There are some use-cases in which data sources are pre-partitioned: > - Kafka broker is already partitioned w.r.t. some key[s] > - There are multiple [Flink] jobs that materialize their outputs and read > them as input subsequently > One of the main benefits is that we might avoid unnecessary shuffling. > There is already an experimental feature in DataStream to support a subset of > these [1]. > We should support this for Flink Table/SQL as well. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467 ## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java: ## @@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class taskClazz) { public void triggerPeriodicScheduledTasks() { for (ScheduledTask scheduledTask : periodicScheduledTasks) { if (!scheduledTask.isCancelled()) { -scheduledTask.execute(); +executeScheduledTask(scheduledTask); } } } +private static void executeScheduledTask(ScheduledTask scheduledTask) { +scheduledTask.execute(); +try { +// try to retrieve result of scheduled task to avoid swallowing any exceptions that +// occurred +scheduledTask.get(); Review Comment: This doesn't work for periodic tasks since the result future in the ScheduledTask never gets completed. I'm not sure if this change is correct in the first place. Every non-periodic task already throws exceptions when trigger is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org