[jira] [Commented] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817602#comment-17817602
 ] 

Matthias Pohl commented on FLINK-22765:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57533&view=logs&j=a657ddbf-d986-5381-9649-342d9c92e7fb&t=dc085d4a-05c8-580e-06ab-21f5624dab16&l=8998

> ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
> 
>
> Key: FLINK-22765
> URL: https://issues.apache.org/jira/browse/FLINK-22765
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.14.0, 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56
> {code}
> May 25 00:56:38 java.lang.AssertionError: 
> May 25 00:56:38 
> May 25 00:56:38 Expected: is ""
> May 25 00:56:38  but: was "The system is out of resources.\nConsult the 
> following stack trace for details."
> May 25 00:56:38   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:956)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:923)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70)
> May 25 00:56:38   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> May 25 00:56:38   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> May 25 00:56:38   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> May 25 00:56:38   at java.lang.reflect.Method.invoke(Method.java:498)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> May 25 00:56:38   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> May 25 00:56:38   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> May 25 00:56:38   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> May 25 00:56:38   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> May 25 00:56:38   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> May 25 00:56:38   at 
> org.apache.maven.surefir

Re: [PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


prakash-42 commented on code in PR #776:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/776#discussion_r1490538791


##
docs/content/docs/custom-resource/overview.md:
##
@@ -216,6 +216,38 @@ Alternatively, if you use helm to install 
flink-kubernetes-operator, it allows y
 
 - Last-state upgradeMode is currently not supported for FlinkSessionJobs
 
+## Flink SQL Jobs

Review Comment:
   Thanks for the comments @mxm ! I agree that we should have a dedicated page 
for examples, instead of adding more in the overview page. However, I have a 
few follow-up concerns/questions:
   
   1. If I don't copy over from the examples' readme, I'd still be needing to 
write content that's similar to whatever is present in the example's readme. I 
can try and make it short, while linking to the readme page for more details.
   
   2. If I create a dedicated Examples page, then I think it shouldn't just 
have the Flink SQL example, but other examples as well. I will try and write a 
few more, but not sure if I have enough experience/knowledge to write about all 
the examples. Can we make this like a running doc where more examples will be 
added as the need arises?
   
   3. One concern I have about linking to README pages is that I'm likely going 
to link to the examples in main branch, and that code may change with later 
releases in the operator. Would that possibly make the examples in the operator 
point to an incompatible readme? (I see that the existing links also point to 
the main branch, so maybe that's okay)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]

2024-02-14 Thread via GitHub


GaoZG2 closed pull request #24313: [typos]The comment for function 
'initializeState' of interface 'CheckpointedFunction' has a spelling error.
URL: https://github.com/apache/flink/pull/24313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 
-if (collectedMetrics.getMetricHistory().isEmpty()) {
+if (collectedMetrics.getMetricHistory().size() < 2) {

Review Comment:
   Would you mind adding a simple comment to explain why we call the scaling 
logic when metric history size >= 2?
   
   It may be helpful to let new developers to understand these logic.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 
-if (collectedMetrics.getMetricHistory().isEmpty()) {
+if (collectedMetrics.getMetricHistory().size() < 2) {

Review Comment:
   Would you mind adding a simple comment to explain why we call the scaling 
logic when metric history size >= 2?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks [flink]

2024-02-14 Thread via GitHub


1996fanrui commented on PR #24288:
URL: https://github.com/apache/flink/pull/24288#issuecomment-1945274578

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1490294787


##
flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java:
##
@@ -189,6 +191,28 @@ public void removeParallelismOverrides(Context jobContext) 
{
 jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
PARALLELISM_OVERRIDES);
 }
 
+@Override
+public void storeConfigOverrides(Context jobContext, Configuration 
configOverrides) {
+jdbcStateStore.putSerializedState(
+getSerializeKey(jobContext),
+CONFIG_OVERRIDES,
+serializeParallelismOverrides(configOverrides.toMap()));

Review Comment:
   ```suggestion
   serializeConfigOverrides(configOverrides.toMap()));
   ```
   
   It may be a bug.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/MemoryTuningUtils.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
+import 
org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
+import 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
+import 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/** Tunes the TaskManager memory. */
+public class MemoryTuningUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MemoryTuningUtils.class);
+public static final ProcessMemoryUtils 
FLINK_MEMORY_UTILS =
+new ProcessMemoryUtils<>(getMemoryOptions(), new 
TaskExecutorFlinkMemoryUtils());
+
+private static final Configuration EMPTY_CONFIG = new Configuration();
+
+/**
+ * Emits a Configuration which contains overrides for the current 
configuration. We are not
+ * modifying the config directly, but we are emitting a new configuration 
which contains any
+ * overrides. This config is persisted separately and applied by the 
autoscaler. That way we can
+ * clear any applied overrides if auto-tuning is disabled.
+ */
+public static Configuration tuneTaskManagerHeapMemory(
+JobAutoScalerContext context,
+EvaluatedMetrics evaluatedMetrics,
+AutoScalerEventHandler eventHandler) {
+
+// Please note that this config is the original configuration created 
from the user spec.
+// It does not contain any already applied overrides.
+var config = new UnmodifiableConfiguration(context.getConfiguration());
+
+// Gather original memory configuration from the user spec
+CommonProcessMemorySpec memSpecs;
+try {
+memSpecs = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(config);
+} catch (IllegalConfigurationException e) {
+LOG.warn("Current memory configuration is not valid. Aborting 
memory tuning.");
+return EMPTY_CONFIG;
+}
+
+var maxHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize();
+LOG.info("Current configured heap size: {}", maxHeapSize);
+
+MemorySize avgHeapSize = get

[jira] [Commented] (FLINK-34395) Release Testing Instructions: Verify FLINK-32514 Support using larger checkpointing interval when source is processing backlog

2024-02-14 Thread Yunfeng Zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817575#comment-17817575
 ] 

Yunfeng Zhou commented on FLINK-34395:
--

Hi [~lincoln.86xy] [~jingge] Sorry for the late reply. This feature does not 
need cross-team testing. Could you please help close this ticket? It seems that 
I don't have the permission to close it.

> Release Testing Instructions: Verify FLINK-32514 Support using larger 
> checkpointing interval when source is processing backlog
> --
>
> Key: FLINK-34395
> URL: https://issues.apache.org/jira/browse/FLINK-34395
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Yunfeng Zhou
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1490292087


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -250,6 +251,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+public static final ConfigOption MEMORY_TUNING_ENABLED =
+autoScalerConfig("memory.tuning.enabled")
+.booleanType()
+.defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+.withDescription(
+"If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+public static final ConfigOption MEMORY_TUNING_MIN_HEAP =
+autoScalerConfig("memory.tuning.heap.min")
+.memoryType()
+.defaultValue(MemorySize.ofMebiBytes(2048L))
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.heap.min"))
+.withDescription(
+"The minimum amount of TaskManager memory, if 
memory tuning is enabled.");

Review Comment:
   Hi @mxm , a gentle reminder: your change might missed this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490288389


##
docs/content/docs/custom-resource/autoscaler.md:
##
@@ -286,16 +286,20 @@ please download JDBC driver and initialize database and 
table first.
 
 ```
 JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar
-# export the password of jdbc state store
+# export the password of jdbc state store & jdbc event handler
 export STATE_STORE_JDBC_PWD=123456
+export EVENT_HANDLER_JDBC_PWD=123456
 
 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \
 org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
 --autoscaler.standalone.fetcher.flink-cluster.host localhost \
 --autoscaler.standalone.fetcher.flink-cluster.port 8081 \
 --autoscaler.standalone.state-store.type jdbc \
 --autoscaler.standalone.state-store.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
---autoscaler.standalone.state-store.jdbc.username root
+--autoscaler.standalone.state-store.jdbc.username root \
+--autoscaler.standalone.event-handler.type jdbc \
+--autoscaler.standalone.event-handler.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
+--autoscaler.standalone.event-handler.jdbc.username root

Review Comment:
   Thanks for the feedback! I have updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 + "export the password 
using this environment variable.",
 code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
 .build());
+
+public static final ConfigOption EVENT_HANDLER_TYPE =
+autoscalerStandaloneConfig("event-handler.type")
+.enumType(EventHandlerType.class)
+.defaultValue(EventHandlerType.LOGGING)
+.withDescription("The autoscaler event handler type.");

Review Comment:
   We can configure them separately.
   
   ```
   --autoscaler.standalone.state-store.type jdbc \
   --autoscaler.standalone.event-handler.type jdbc \
   ```
   
   After users choose JDBC for them, we can reuse all jdbc related options, 
such as:
   
   ```
   autoscaler.standalone.jdbc.username
   autoscaler.standalone.jdbc.url
   ```
   
   Here is an whole example(This example is from the doc):
   
   ```
   JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar
   # export the password of jdbc state store & jdbc event handler
   export JDBC_PWD=123456
   
   java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \
   org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
   --autoscaler.standalone.fetcher.flink-cluster.host localhost \
   --autoscaler.standalone.fetcher.flink-cluster.port 8081 \
   --autoscaler.standalone.state-store.type jdbc \
   --autoscaler.standalone.event-handler.type jdbc \
   --autoscaler.standalone.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
   --autoscaler.standalone.jdbc.username root
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 + "export the password 
using this environment variable.",
 code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
 .build());
+
+public static final ConfigOption EVENT_HANDLER_TYPE =
+autoscalerStandaloneConfig("event-handler.type")
+.enumType(EventHandlerType.class)
+.defaultValue(EventHandlerType.LOGGING)
+.withDescription("The autoscaler event handler type.");

Review Comment:
   We can configure them separately.
   
   ```
   --autoscaler.standalone.state-store.type jdbc \
   --autoscaler.standalone.event-handler.type jdbc \
   ```
   
   After users choose JDBC for them, we can reuse all jdbc related options, 
such as:
   
   ```
   autoscaler.standalone.jdbc.username
   autoscaler.standalone.jdbc.url
   ```
   
   Here is an example:
   
   ```
   ```
   JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar
   # export the password of jdbc state store & jdbc event handler
   export JDBC_PWD=123456
   
   java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \
   org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
   --autoscaler.standalone.fetcher.flink-cluster.host localhost \
   --autoscaler.standalone.fetcher.flink-cluster.port 8081 \
   --autoscaler.standalone.state-store.type jdbc \
   --autoscaler.standalone.event-handler.type jdbc \
   --autoscaler.standalone.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
   --autoscaler.standalone.jdbc.username root
   ```
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1490275258


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 + "export the password 
using this environment variable.",
 code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
 .build());
+
+public static final ConfigOption EVENT_HANDLER_TYPE =
+autoscalerStandaloneConfig("event-handler.type")
+.enumType(EventHandlerType.class)
+.defaultValue(EventHandlerType.LOGGING)
+.withDescription("The autoscaler event handler type.");

Review Comment:
   We can configure them separately.
   
   ```
   --autoscaler.standalone.state-store.type jdbc \
   --autoscaler.standalone.event-handler.type jdbc \
   ```
   
   After users choose JDBC for them, we can reuse all jdbc related options, 
such as:
   
   ```
   autoscaler.standalone.jdbc.username
   autoscaler.standalone.jdbc.url
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly writes update_time [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui merged PR #771:
URL: https://github.com/apache/flink-kubernetes-operator/pull/771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34248] Implement restore tests for changelog normalize node [flink]

2024-02-14 Thread via GitHub


bvarghese1 commented on PR #24203:
URL: https://github.com/apache/flink/pull/24203#issuecomment-1945216459

   > Looks good, but could we make the input/output easier to digest?
   > 
   > 1. Do we need to have a two column key?
   > 2. Can we have a non-numeric key? At least for me it's easier to track 
changes for something more distinct.
   
   1. Simplified to 1 primary key
   2. Switched to non-numeric key


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `The function sorts an array, defaulting to ascending order with NULLs at 
the start when only the array is input. Specifying ascending_order as true 
orders the array in ascending with NULLs first, and setting it to false orders 
it in descending with NULLs last. Independently, null_first as true moves NULLs 
to the beginning, and as false to the end, irrespective of the sorting order. 
The function returns null if any input is null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `Returns the array in sorted order. When only the input array is provided, 
ascending_order and null_first default to true, sorting the array in ascending 
order with NULLs at the start. Setting ascending_order to true sorts array in 
ascending order, placing NULLs first. Setting it to false sorts array in 
descending order, with NULLs last. null_first true places NULLs at the 
beginning, and false at the end, regardless of sort order. If any input is 
null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `Returns the array in sorted order. When only the input array is provided, 
ascending_order and null_first default to true, sorting the array in ascending 
order with NULLs at the start. True for ascending_order results in ascending 
sort with NULLs first, while false leads to descending sort with NULLs last. 
Regardless of ascending_order, null_first true places NULLs at the beginning, 
and false at the end. If any input is null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `The function sorts an array with ascending_order and null_first as optional 
inputs. By default, with only the input array, both are true, resulting in 
ascending order sorting with NULLs at the beginning. Specifying ascending_order 
without null_first results in sorting where true for ascending_order places 
NULLs first, and false places NULLs last. With all three inputs, null_first 
true places NULLs at the start, and false at the end, regardless of 
ascending_order's value. If any input is null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table

2024-02-14 Thread Vallari Rastogi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817533#comment-17817533
 ] 

Vallari Rastogi commented on FLINK-32596:
-

[~luoyuxia] 

Hive Metastore expects the partitioned column should be last while inserting 
data. [hiveql - Hive partition column - Stack 
Overflow|https://stackoverflow.com/questions/60510174/hive-partition-column]

So what flink does is, it uses the last 'n' columns as PartitionColumns: 
[https://github.com/apache/flink/blob/403694e7b9c213386f3ed9cff21ce2664030ebc2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java#L515]

 

And select , insert Queries follows the same logic of finding partition columns 
at the last!

 

As a test, I made a chng here.  

[https://github.com/apache/flink/commit/df47ceaba82a3d4f3392c1b53bb52f34d520cc3d]

 

Results:

!image-2024-02-15-03-05-22-541.png|width=600,height=106!

!image-2024-02-15-03-06-28-175.png|width=468,height=267!

The partitions will always come at the last due to HMS. Either we use insert 
stmt like: 

_INSERT INTO testHive2 PARTITION (ts='22:16:46', active='TRUE') SELECT 1, 46, 
'false';_

_SELECT query output:_

_!image-2024-02-15-03-08-50-029.png|width=525,height=328!_

> The partition key will be wrong when use Flink dialect to create Hive table
> ---
>
> Key: FLINK-32596
> URL: https://issues.apache.org/jira/browse/FLINK-32596
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: luoyuxia
>Assignee: Vallari Rastogi
>Priority: Major
> Attachments: image-2024-02-14-16-06-13-126.png, 
> image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png, 
> image-2024-02-15-03-08-50-029.png
>
>
> Can be reproduced by the following SQL:
>  
> {code:java}
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql(
> "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by 
> (`date`)"
> + " with ('connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s',  
> 'sink.partition-commit.policy.kind'='metastore,success-file')");
> CatalogTable catalogTable =
> (CatalogTable) 
> hiveCatalog.getTable(ObjectPath.fromString("default.t1"));
> // the following assertion will fail
> assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table

2024-02-14 Thread Vallari Rastogi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vallari Rastogi updated FLINK-32596:

Attachment: image-2024-02-15-03-08-50-029.png

> The partition key will be wrong when use Flink dialect to create Hive table
> ---
>
> Key: FLINK-32596
> URL: https://issues.apache.org/jira/browse/FLINK-32596
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: luoyuxia
>Assignee: Vallari Rastogi
>Priority: Major
> Attachments: image-2024-02-14-16-06-13-126.png, 
> image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png, 
> image-2024-02-15-03-08-50-029.png
>
>
> Can be reproduced by the following SQL:
>  
> {code:java}
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql(
> "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by 
> (`date`)"
> + " with ('connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s',  
> 'sink.partition-commit.policy.kind'='metastore,success-file')");
> CatalogTable catalogTable =
> (CatalogTable) 
> hiveCatalog.getTable(ObjectPath.fromString("default.t1"));
> // the following assertion will fail
> assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table

2024-02-14 Thread Vallari Rastogi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vallari Rastogi updated FLINK-32596:

Attachment: image-2024-02-15-03-06-28-175.png

> The partition key will be wrong when use Flink dialect to create Hive table
> ---
>
> Key: FLINK-32596
> URL: https://issues.apache.org/jira/browse/FLINK-32596
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: luoyuxia
>Assignee: Vallari Rastogi
>Priority: Major
> Attachments: image-2024-02-14-16-06-13-126.png, 
> image-2024-02-15-03-05-22-541.png, image-2024-02-15-03-06-28-175.png
>
>
> Can be reproduced by the following SQL:
>  
> {code:java}
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql(
> "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by 
> (`date`)"
> + " with ('connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s',  
> 'sink.partition-commit.policy.kind'='metastore,success-file')");
> CatalogTable catalogTable =
> (CatalogTable) 
> hiveCatalog.getTable(ObjectPath.fromString("default.t1"));
> // the following assertion will fail
> assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table

2024-02-14 Thread Vallari Rastogi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vallari Rastogi updated FLINK-32596:

Attachment: image-2024-02-15-03-05-22-541.png

> The partition key will be wrong when use Flink dialect to create Hive table
> ---
>
> Key: FLINK-32596
> URL: https://issues.apache.org/jira/browse/FLINK-32596
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: luoyuxia
>Assignee: Vallari Rastogi
>Priority: Major
> Attachments: image-2024-02-14-16-06-13-126.png, 
> image-2024-02-15-03-05-22-541.png
>
>
> Can be reproduced by the following SQL:
>  
> {code:java}
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql(
> "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by 
> (`date`)"
> + " with ('connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s',  
> 'sink.partition-commit.policy.kind'='metastore,success-file')");
> CatalogTable catalogTable =
> (CatalogTable) 
> hiveCatalog.getTable(ObjectPath.fromString("default.t1"));
> // the following assertion will fail
> assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489957782


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(

Review Comment:
   5 minutes seems reasonable for prod envs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489957348


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(

Review Comment:
   If no trouble for you @mateczagany let's add the default timeout increase 
here. I am not a big fan of too many PRs it's just more work to review :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-02-14 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-34445:
---
Component/s: Runtime / Web Frontend

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-02-14 Thread Mason Chen (Jira)
Mason Chen created FLINK-34445:
--

 Summary: Integrate new endpoint with Flink UI metrics section
 Key: FLINK-34445
 URL: https://issues.apache.org/jira/browse/FLINK-34445
 Project: Flink
  Issue Type: Sub-task
Reporter: Mason Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34444) Add new endpoint handler to flink

2024-02-14 Thread Mason Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mason Chen updated FLINK-3:
---
Component/s: Runtime / Metrics
 Runtime / REST

> Add new endpoint handler to flink
> -
>
> Key: FLINK-3
> URL: https://issues.apache.org/jira/browse/FLINK-3
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / REST
>Reporter: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34444) Add new endpoint handler to flink

2024-02-14 Thread Mason Chen (Jira)
Mason Chen created FLINK-3:
--

 Summary: Add new endpoint handler to flink
 Key: FLINK-3
 URL: https://issues.apache.org/jira/browse/FLINK-3
 Project: Flink
  Issue Type: Sub-task
Reporter: Mason Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


mateczagany commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489946709


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(

Review Comment:
   I don't think there could be any downside to setting the default cleanup 
timeout to e.g. 5 minutes. @gyfora WDYT? Should that be done in another PR or 
should I add that here, or do you think we should leave the timeout as is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]

2024-02-14 Thread via GitHub


flinkbot commented on PR #24313:
URL: https://github.com/apache/flink/pull/24313#issuecomment-1944401398

   
   ## CI report:
   
   * 487d27788e16263ad76fcb5ce6d5ca4a1a1015d2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [typos]The comment for function 'initializeState' of interface 'CheckpointedFunction' has a spelling error. [flink]

2024-02-14 Thread via GitHub


GaoZG2 opened a new pull request, #24313:
URL: https://github.com/apache/flink/pull/24313

   [typos]The comment for function 'initializeState' of interface 
'CheckpointedFunction' has a spelling error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-14 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817467#comment-17817467
 ] 

Jacek Wislicki commented on FLINK-34436:


Added also a simpler example (without enumerations that seem to be the key 
problem): https://github.com/JacekWislicki/test12

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


mxm commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1489841998


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+public static final ConfigOption MEMORY_TUNING_ENABLED =
+autoScalerConfig("memory.tuning.enabled")
+.booleanType()
+.defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+.withDescription(
+"If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+public static final ConfigOption 
MEMORY_TUNING_HEAP_TARGET =
+autoScalerConfig("memory.tuning.heap.target-usage")
+.enumType(MemoryTuning.HeapTuningTarget.class)
+.defaultValue(MemoryTuning.HeapTuningTarget.AVG)

Review Comment:
   Updated to use MAX as the default. Also added extra overhead which is pretty 
important for the memory to be able to grow once reduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] Extend error labeling to void restart loops. [flink]

2024-02-14 Thread via GitHub


flinkbot commented on PR #24312:
URL: https://github.com/apache/flink/pull/24312#issuecomment-1944295795

   
   ## CI report:
   
   * 2451d70d20c20e6c1cc386640b3235652d227d93 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-02-14 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-1944280510

   @hlteoh37 @z3d1k please have a look, I pushes the changes (in a single 
squashed commit).
   I implemented all the requested changes except those I explicitly commented 
not to implement.
   Happy to discuss


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location [flink]

2024-02-14 Thread via GitHub


XComp commented on PR #24244:
URL: https://github.com/apache/flink/pull/24244#issuecomment-1944268824

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


XComp commented on PR #24309:
URL: https://github.com/apache/flink/pull/24309#issuecomment-1944265004

   I updated the PR but will wait till tomorrow with switching it from draft 
back to reviewable state. I want to wait for CI to pass before it makes sense 
to review the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now ARRAY_SORT(, [, 
[, ]]), where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now ARRAY_SORT([, 
[, ]]), where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34403.
---
Resolution: Fixed

* master
** 
[2298e53f35121602c56845ac8040439fbd1a9ff4|https://github.com/apache/flink/commit/2298e53f35121602c56845ac8040439fbd1a9ff4]
** 
[9a316a5bcc47da7f69e76e0c25ed257adc4298ce|https://github.com/apache/flink/commit/9a316a5bcc47da7f69e76e0c25ed257adc4298ce]

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37)
> Feb 07 05:43:21   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalArgumentException: Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323)
> Feb 07 05:43:21   ... 18 more
> Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: 
> Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.lang.Throwable.addSuppressed(Throwable.java:1072)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfi

Re: [PR] [FLINK-34403][ci] Transforms VeryBigPbRowToProtoTest into an integration test [flink]

2024-02-14 Thread via GitHub


XComp merged PR #24311:
URL: https://github.com/apache/flink/pull/24311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-02-14 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489791182


##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java:
##
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.util.BinaryUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+/** Sign a Remote-Write request to Amazon Managed Service for Prometheus 
(AMP). */
+public class AmazonManagedPrometheusWriteRequestSigner implements 
PrometheusRequestSigner {
+
+private final URL remoteWriteUrl;
+private final String awsRegion;
+
+/**
+ * Constructor.
+ *
+ * @param remoteWriteUrl URL of the remote-write endpoint
+ * @param awsRegion Region of the AMP workspace
+ */
+public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, 
String awsRegion) {
+Preconditions.checkArgument(
+StringUtils.isNotBlank(awsRegion), "Missing or blank AMP 
workspace region");
+Preconditions.checkNotNull(
+StringUtils.isNotBlank(remoteWriteUrl),
+"Missing or blank AMP workspace remote-write URL");
+this.awsRegion = awsRegion;
+try {
+this.remoteWriteUrl = new URL(remoteWriteUrl);
+} catch (MalformedURLException e) {
+throw new IllegalArgumentException(
+"Invalid AMP remote-write URL: " + remoteWriteUrl, e);
+}
+}
+
+/**
+ * Add the additional Http request headers required by Amazon Managed 
Prometheus:
+ * 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' 
and 'Authorization`.
+ *
+ * @param requestHeaders original Http request headers. It must be 
mutable. For efficiency, any
+ * new header is added to the map, instead of making a copy.
+ * @param requestBody request body, already compressed
+ */
+@Override
+public void addSignatureHeaders(Map requestHeaders, byte[] 
requestBody) {
+byte[] contentHash = AWS4SignerBase.hash(requestBody);
+String contentHashString = BinaryUtils.toHex(contentHash);
+requestHeaders.put(
+"x-amz-content-sha256",
+contentHashString); // this header must be included before 
generating the
+// Authorization header
+
+DefaultAWSCredentialsProviderChain credsChain = new 
DefaultAWSCredentialsProviderChain();

Review Comment:
   Actually, I reverted the change. Just passing a credentials provider 
instance obviously doesn't work, not being serializable.
   
   Making the credentials provider customisable would mean either:
   1. Copying the implementation of 
`org.apache.flink.connector.aws.util.AWSGeneralUtil` from 
`flink-connector-aws-base` and using it to create the credential provider at 
runtime, based on configuration, or
   2. Including `flink-connector-aws-base` as a dependency. But this would 
bring in a number of transitive dependencies that are not used. The only AWS 
SDK dependencies in this connector are in this signer, and only to get the 
credentials. Also, `flink-connector-aws-base` uses sdk2, while this signer uses 
sdk1. We discussed with @hlteoh37 that porting to sdk2 is nice to have but not 
a priority for the first release.
   
   Considering `DefaultAWSCredentialsProviderChain` will already cover almost 
every possible authentication scenario when a Flink application will be writing 
to AMP, I would not consider configurable credentials provider a priority for 
the first release, and wait if any actual user will need it.
   
   Happy to discuss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message,

Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


numbnut commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489724962


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(

Review Comment:
   I fully agree, but don't have a solution. At least the default value for the 
cleanup timeout could be increased to a higher value. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


XComp commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489646969


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   True, I missed that we're not retrieving the result in case of periodic 
tasks in 
[ScheduledTask#execute](https://github.com/apache/flink/blob/5405239dec0884dff746129c73955c90f455c465/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java#L76).
   
   I changed the behavior: Instead of collecting the error in the future, we're 
going to throw the error rightaway:
   > ScheduledTask only serves as a container for a Callable. The error 
handling should be done by an Executor (e.g. the main thread). Therefore, 
explicitly handling errors inside #execute is out of scope for ScheduledTask.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() {
 if (requirementsCheckDelay.toMillis() <= 0) {
 checkResourceRequirements();
 } else {
-if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
-requirementsCheckFuture = new CompletableFuture<>();
-scheduledExecutor.schedule(
-() ->
-mainThreadExecutor.execute(
-() -> {
-checkResourceRequirements();
-
Preconditions.checkNotNull(requirementsCheckFuture)
-.complete(null);
-}),
-requirementsCheckDelay.toMillis(),
-TimeUnit.MILLISECONDS);
+if (requirementsCheckFuture.isDone()) {
+requirementsCheckFuture =
+scheduledExecutor.schedule(
+() -> {
+if (started) {

Review Comment:
   yeah, I had to revisit the PR. I looked into the synchronization of 
`started` yesterday and was kind of puzzled why we haven't had to synchronize 
the field till now. But that's due to the fact that all methods that rely on 
the `started` field are actually called from within the ResourceManager's main 
thread. In the end, I forgot to consider this when actually accessing the 
`SlotManager`'s state from another thread :facepalm: 
   
   Anyway, I reiterated over the PR and added some assertions to make it 
clearer that the `FineGrainedSlotManager` should be handled from within the 
`ResourceManager`'s main thread. Please clarify if I came up with the wrong 
conclusion and missed something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


mxm commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1489593700


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -109,4 +110,51 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 + "export the password 
using this environment variable.",
 code(STATE_STORE_TYPE.key()), 
code(JDBC.toString()))
 .build());
+
+public static final ConfigOption EVENT_HANDLER_TYPE =
+autoscalerStandaloneConfig("event-handler.type")
+.enumType(EventHandlerType.class)
+.defaultValue(EventHandlerType.LOGGING)
+.withDescription("The autoscaler event handler type.");

Review Comment:
   What if users want to configure both logging and storing via JDBC?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


mateczagany commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489494523


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(
+"Failed to shut down cluster {} (deployment {}) in {} 
seconds, proceeding...",
+clusterId,
+deploymentName,
+shutdownTimeout);
+return;
 }
+}
+}
 
-if (serviceRunning) {
-Service service =
-kubernetesClient
-.services()
-.inNamespace(namespace)
-.withName(
-
ExternalServiceDecorator.getExternalServiceName(clusterId))
-.get();
-if (service == null) {
-serviceRunning = false;
-}
-}
+/** Wait until Deployment is removed, return false if timed out, otherwise 
return true. */
+@VisibleForTesting
+boolean waitForDeploymentToBeRemoved(String namespace, String 
deploymentName, long timeout) {
+ScheduledExecutorService logger = 
Executors.newSingleThreadScheduledExecutor();
+logger.scheduleWithFixedDelay(
+() -> LOG.info("Waiting for Deployment {} to shut down...", 
deploymentName),
+5,
+5,
+TimeUnit.SECONDS);

Review Comment:
   You are right, I will remove it. I don't think it's worth to keep those 
logs, since it's easy to figure out what we're waiting for if we have 
before/after logs as in your second suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34152] Tune heap memory of autoscaled jobs [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #762:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/762#discussion_r1489489638


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+public static final ConfigOption MEMORY_TUNING_ENABLED =
+autoScalerConfig("memory.tuning.enabled")
+.booleanType()
+.defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+.withDescription(
+"If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+public static final ConfigOption 
MEMORY_TUNING_HEAP_TARGET =
+autoScalerConfig("memory.tuning.heap.target-usage")

Review Comment:
   This config key sounds similar to target utilisation but works a completely 
different way, but I don't really have a better suggestion :) 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -250,6 +252,39 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Max allowed percentage of heap usage during 
scaling operations. Autoscaling will be paused if the heap usage exceeds this 
threshold.");
 
+public static final ConfigOption MEMORY_TUNING_ENABLED =
+autoScalerConfig("memory.tuning.enabled")
+.booleanType()
+.defaultValue(false)
+
.withFallbackKeys(oldOperatorConfigKey("memory.tuning.enabled"))
+.withDescription(
+"If enabled, the initial amount of memory 
specified for TaskManagers will be reduced according to the observed needs.");
+
+public static final ConfigOption 
MEMORY_TUNING_HEAP_TARGET =
+autoScalerConfig("memory.tuning.heap.target-usage")
+.enumType(MemoryTuning.HeapTuningTarget.class)
+.defaultValue(MemoryTuning.HeapTuningTarget.AVG)

Review Comment:
   If Max is more conservative should that be the default? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33636][autoscaler] Support the JDBCAutoScalerEventHandler [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #765:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/765#discussion_r1489463353


##
docs/content/docs/custom-resource/autoscaler.md:
##
@@ -286,16 +286,20 @@ please download JDBC driver and initialize database and 
table first.
 
 ```
 JDBC_DRIVER_JAR=./mysql-connector-java-8.0.30.jar
-# export the password of jdbc state store
+# export the password of jdbc state store & jdbc event handler
 export STATE_STORE_JDBC_PWD=123456
+export EVENT_HANDLER_JDBC_PWD=123456
 
 java -cp flink-autoscaler-standalone-{{< version >}}.jar:${JDBC_DRIVER_JAR} \
 org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
 --autoscaler.standalone.fetcher.flink-cluster.host localhost \
 --autoscaler.standalone.fetcher.flink-cluster.port 8081 \
 --autoscaler.standalone.state-store.type jdbc \
 --autoscaler.standalone.state-store.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
---autoscaler.standalone.state-store.jdbc.username root
+--autoscaler.standalone.state-store.jdbc.username root \
+--autoscaler.standalone.event-handler.type jdbc \
+--autoscaler.standalone.event-handler.jdbc.url 
jdbc:mysql://localhost:3306/flink_autoscaler \
+--autoscaler.standalone.event-handler.jdbc.username root

Review Comment:
   @1996fanrui I agree that we should probably merge the configs as they will 
be used together. Instead of calling it `jdbc-plugin` we could just call the 
properties:
   ```
   autoscaler.standalone.jdbc.username
   autoscaler.standalone.jdbc.url
   ...
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-34439) Move chown operations to COPY commands in Dockerfile

2024-02-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-34439.
--
Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

merged to main 6cab745df9d0a742ab15f341bdf69c8e802b2a39

> Move chown operations to COPY commands in Dockerfile
> 
>
> Key: FLINK-34439
> URL: https://issues.apache.org/jira/browse/FLINK-34439
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Assignee: Mate Czagany
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> We can lower the size of the output operator container image if we don't run 
> 'chown' commands in seperate RUN commands inside the Dockerfile, but instead 
> use the '--chown' argument of the COPY command.
> Using 'RUN chown...' will copy all the files affected with their whole size 
> to a new layer, duplicating the previous files from the COPY command.
> Example:
> {code:java}
> $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> ...
>      3 months ago  RUN /bin/sh -c chown -R flink:flink $FLINK...  
> 116MB       buildkit.dockerfile.v0
> ... {code}
> This would mean a 20% reduction in image size.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34439] Move chown operations to COPY commands in Dockerfile [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora merged PR #775:
URL: https://github.com/apache/flink-kubernetes-operator/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   This is quite a rabbit hole 😆 
   
   1. Databricks has two functions:
   
   * sort_array 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - 
Sorts ascending, puts NULL at beginning.
   *  array_sort 
https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - 
Sorts ascending, puts NULL at the end. 
   
   2. Spark has 
https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is 
indeed not different from the Databricks one. Sorts ascending, puts NULL at 
beginning
   
   3. Snowflake indeed has 
https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts 
ascending, puts NULL at ~~the end~~ beginning.
   
   4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 
   
   5. StarRocks also has this function 
https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   6. Doris has it 
https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, 
per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts 
NULL at beginning.
   
   I do think that the Snowflake has the cleanest function signature: there's 
just one function, and it has two optional arguments to give all the 
flexibility. 
   
   I think the default sorting mode should be ascending, and then it boils down 
to decide on the default location on where to put NULL. Out of the 7 
implementations, we have 6 who put NULL at the beginning, and only Databricks 
puts them at the end. Naturally, I think we should put NULL at the beginning 
but I'm +0 on the default location of NULL. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #777:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/777#discussion_r1489430330


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -898,58 +901,54 @@ public JobDetailsInfo getJobDetailsInfo(JobID jobID, 
Configuration conf) throws
 }
 }
 
-/** Wait until the FLink cluster has completely shut down. */
-@VisibleForTesting
-void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-LOG.info("Waiting for cluster shutdown...");
-
-boolean jobManagerRunning = true;
-boolean taskManagerRunning = true;
-boolean serviceRunning = true;
+/** Returns a list of Kubernetes Deployment names for given cluster. */
+protected abstract List getDeploymentNames(String namespace, 
String clusterId);
 
-for (int i = 0; i < shutdownTimeout; i++) {
-if (jobManagerRunning) {
-PodList jmPodList = getJmPodList(namespace, clusterId);
+/** Wait until the FLink cluster has completely shut down. */
+protected void waitForClusterShutdown(
+String namespace, String clusterId, long shutdownTimeout) {
+long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-jobManagerRunning = false;
-}
-}
-if (taskManagerRunning) {
-PodList tmPodList = getTmPodList(namespace, clusterId);
+for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-if (tmPodList.getItems().isEmpty()) {
-taskManagerRunning = false;
-}
+if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+LOG.error(
+"Failed to shut down cluster {} (deployment {}) in {} 
seconds, proceeding...",
+clusterId,
+deploymentName,
+shutdownTimeout);
+return;
 }
+}
+}
 
-if (serviceRunning) {
-Service service =
-kubernetesClient
-.services()
-.inNamespace(namespace)
-.withName(
-
ExternalServiceDecorator.getExternalServiceName(clusterId))
-.get();
-if (service == null) {
-serviceRunning = false;
-}
-}
+/** Wait until Deployment is removed, return false if timed out, otherwise 
return true. */
+@VisibleForTesting
+boolean waitForDeploymentToBeRemoved(String namespace, String 
deploymentName, long timeout) {
+ScheduledExecutorService logger = 
Executors.newSingleThreadScheduledExecutor();
+logger.scheduleWithFixedDelay(
+() -> LOG.info("Waiting for Deployment {} to shut down...", 
deploymentName),
+5,
+5,
+TimeUnit.SECONDS);

Review Comment:
   This looks a bit strange / heavy to create always a new executor (thread), 
can we have a shared pool / or hook into the kubernetes client 
`waitUntilCondition` somehow? Or alternatively we could just remove the 
periodic logs and log before and after



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489408092


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -43,49 +44,61 @@ public static void computeLoadMetrics(
 JobVertexID jobVertexID,
 Map flinkMetrics,
 Map scalingMetrics,
+IOMetrics ioMetrics,
 Configuration conf) {
 
-double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
-scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000);
+scalingMetrics.put(
+ScalingMetric.LOAD,
+getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 
1000.);
+scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, 
ioMetrics.getAccumulatedBusyTime());
+}
+
+private static double getBusyTimeMsPerSecond(
+Map flinkMetrics,
+Configuration conf,
+JobVertexID jobVertexId) {
+var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+var busyTimeMsPerSecond =
+
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
+if (!Double.isFinite(busyTimeMsPerSecond)) {
+if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
+// We only want to log this once
+LOG.warn(
+"No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
+jobVertexId);
+}
+return Double.NaN;
+}
+return Math.max(0, busyTimeMsPerSecond);
 }
 
 public static void computeDataRateMetrics(
 JobVertexID jobVertexID,
 Map flinkMetrics,
 Map scalingMetrics,
 JobTopology topology,
-double lagGrowthRate,
 Configuration conf,
 Supplier observedTprAvg) {
 
 var isSource = topology.isSource(jobVertexID);
+var ioMetrics = topology.get(jobVertexID).getIoMetrics();
 
 double numRecordsInPerSecond =
-getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
+getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489403136


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
 for (JsonNode node : nodes) {
 var vertexId = JobVertexID.fromHexString(node.get("id").asText());
 var inputList = new HashSet();
+var ioMetrics = metrics.get(vertexId);
+var finished = finishedVertices.contains(vertexId);
 vertexInfo.add(
 new VertexInfo(
 vertexId,
 inputList,
+null,
 node.get("parallelism").asInt(),
 maxParallelismMap.get(vertexId),
-finished.contains(vertexId)));
+maxParallelismMap.get(vertexId),

Review Comment:
   I will add an explicit constructor to clean this up instead of relying on 
Lombok allArgsConstructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489401020


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -356,13 +421,13 @@ public static double getAverage(
 ? 
collectedMetrics.getVertexMetrics().get(jobVertexId)
 : collectedMetrics.getGlobalMetrics();
 double num = metrics.getOrDefault(metric, Double.NaN);
-if (Double.isNaN(num)) {
-continue;
-}
 if (Double.isInfinite(num)) {
 anyInfinite = true;
 continue;
 }
+if (Double.isNaN(num)) {
+continue;
+}

Review Comment:
   I will revert this, I made some changes that I later removed, not necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399611


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -264,16 +299,14 @@ private void computeTargetDataRate(
 if (topology.isSource(vertex)) {
 double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
-if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
+double lagRate = getRate(LAG, vertex, metricsHistory);
+double sourceDataRate = Math.max(0, inputRate + lagRate);

Review Comment:
   Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399318


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -341,6 +375,37 @@ public static double getAverage(
 return getAverage(metric, jobVertexId, metricsHistory, 1);
 }
 
+public static double getRate(
+ScalingMetric metric,
+@Nullable JobVertexID jobVertexId,
+SortedMap metricsHistory) {
+
+Instant firstTs = null;
+double first = Double.NaN;
+
+Instant lastTs = null;
+double last = Double.NaN;
+
+for (var entry : metricsHistory.entrySet()) {
+double value = 
entry.getValue().getVertexMetrics().get(jobVertexId).get(metric);
+if (!Double.isNaN(value)) {
+if (Double.isNaN(first)) {
+first = value;
+firstTs = entry.getKey();
+} else {
+last = value;
+lastTs = entry.getKey();
+}
+}
+}
+if (Double.isNaN(last)) {
+return Double.NaN;
+}
+
+double diff = last - first;
+return diff == 0 ? 0 : diff / Duration.between(firstTs, 
lastTs).toSeconds();

Review Comment:
   I will this, I still had to change the logic so that we compute the seconds 
rate using the millisecond diff otherwise we can accidentally divide by zero
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This doesn't work for periodic tasks since the result future in the 
ScheduledTask never gets completed.
   
   I'm not sure if this change is correct in the first place. Every 
non-periodic task already throws exceptions when trigger is called, and you 
can't wait for non-periodic tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489343962


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Sounds good to me, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489331336


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   @MartijnVisser Actually Snowflake does:
   > nulls_first
Default: FALSE if the ARRAY is sorted in ascending order; TRUE if the ARRAY 
is sorted in descending order.
   
   I'd suggest we:

   1. adapt Snowflake's signature where we get `ARRAY, BOOLEAN, 
BOOLEAN`, output is `ARRAY` which is `NULLABLE` if any of the input 
is nullable
   2. We use defaults: ascending and `nulls_first` for ASC, `nulls_last` for 
DESC
   3. If any of the input is null the output is also null
   
   How does this sound?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


mxm commented on code in PR #776:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/776#discussion_r1489321926


##
docs/content/docs/custom-resource/overview.md:
##
@@ -216,6 +216,38 @@ Alternatively, if you use helm to install 
flink-kubernetes-operator, it allows y
 
 - Last-state upgradeMode is currently not supported for FlinkSessionJobs
 
+## Flink SQL Jobs

Review Comment:
   I think we should avoid copying anything over from 
https://github.com/apache/flink-kubernetes-operator/tree/main/examples/. 
Instead, we can extract the above linked examples section to a dedicated page 
and add any extra information we find useful.



##
docs/content/docs/custom-resource/overview.md:
##
@@ -216,6 +216,38 @@ Alternatively, if you use helm to install 
flink-kubernetes-operator, it allows y
 
 - Last-state upgradeMode is currently not supported for FlinkSessionJobs
 
+## Flink SQL Jobs

Review Comment:
   Taking another look, this might not be the best place to add this 
information because it's an overview page and the steps are quite detailed. 
There is already a section mentioning the examples: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#built-in-examples
 How about creating a dedicated Examples page?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   This is quite a rabbit hole 😆 
   
   1. Databricks has two functions:
   
   * sort_array 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - 
Sorts ascending, puts NULL at beginning.
   *  array_sort 
https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - 
Sorts ascending, puts NULL at the end. 
   
   2. Spark has 
https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is 
indeed not different from the Databricks one. Sorts ascending, puts NULL at 
beginning
   
   3. Snowflake indeed has 
https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts 
ascending, puts NULL at the end. 
   
   4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 
   
   5. StarRocks also has this function 
https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   6. Doris has it 
https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, 
per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts 
NULL at beginning.
   
   I do think that the Snowflake has the cleanest function signature: there's 
just one function, and it has two optional arguments to give all the 
flexibility. 
   
   I think the default sorting mode should be ascending, and then it boils down 
to decide on the default location on where to put NULL. Out of the 7 
implementations, we have 5 who put NULL at the beginning, and 2 who put them at 
the end. Naturally, I think we should put NULL at the beginning but I'm +0 on 
the default location of NULL. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:12 AM:
-

Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21  

[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:12 AM:
-

Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21  

[jira] [Comment Edited] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298
 ] 

Matthias Pohl edited comment on FLINK-34336 at 2/14/24 11:11 AM:
-

* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=15356


was (Author: mapohl):
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190

> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang sometimes
> -
>
> Key: FLINK-34336
> URL: https://issues.apache.org/jira/browse/FLINK-34336
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang in 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color}
> h2. Reason:
> The job has 2 tasks(vertices), after calling updateJobResourceRequirements. 
> The source parallelism isn't changed (It's parallelism) , and the 
> FlatMapper+Sink is changed from  parallelism to parallelism2.
> So we expect the task number should be parallelism + parallelism2 instead of 
> parallelism2.
>  
> h2. Why it can be passed for now?
> Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by 
> default. It means, flink job will rescale job 30 seconds after 
> updateJobResourceRequirements is called.
>  
> So the running tasks are old parallelism when we call 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color}
> IIUC, it cannot be guaranteed, and it's unexpected.
>  
> h2. How to reproduce this bug?
> [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6]
>  * Disable the cooldown
>  * Sleep for a while before waitForRunningTasks
> If so, the job running in new parallelism, so `waitForRunningTasks` will hang 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:11 AM:
-

Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21  

[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:10 AM:
-

Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=298e20ef-7951-5965-0e79-ea664ddc435e&t=d4c90338-c843-57b0-3232-10ae74f00347&l=23375

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 

[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 11:10 AM:
-

Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57518&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=23068

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHe

[jira] [Commented] (FLINK-34273) git fetch fails

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817334#comment-17817334
 ] 

Matthias Pohl commented on FLINK-34273:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57521&view=logs&j=60960eae-6f09-579e-371e-29814bdd1adc&t=1fe608a4-e773-5ca0-5336-1c37a61b9f8d

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable

2024-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-22765:
--
Priority: Critical  (was: Major)

> ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
> 
>
> Key: FLINK-22765
> URL: https://issues.apache.org/jira/browse/FLINK-22765
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.14.0, 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56
> {code}
> May 25 00:56:38 java.lang.AssertionError: 
> May 25 00:56:38 
> May 25 00:56:38 Expected: is ""
> May 25 00:56:38  but: was "The system is out of resources.\nConsult the 
> following stack trace for details."
> May 25 00:56:38   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:956)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:923)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70)
> May 25 00:56:38   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> May 25 00:56:38   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> May 25 00:56:38   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> May 25 00:56:38   at java.lang.reflect.Method.invoke(Method.java:498)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> May 25 00:56:38   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> May 25 00:56:38   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> May 25 00:56:38   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> May 25 00:56:38   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> May 25 00:56:38   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> May 25 00:56:38 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22765) ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817333#comment-17817333
 ] 

Matthias Pohl commented on FLINK-22765:
---

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57521&view=logs&j=a657ddbf-d986-5381-9649-342d9c92e7fb&t=dc085d4a-05c8-580e-06ab-21f5624dab16&l=8997

> ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError is unstable
> 
>
> Key: FLINK-22765
> URL: https://issues.apache.org/jira/browse/FLINK-22765
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.14.0, 1.13.5, 1.15.0, 1.17.2, 1.19.0, 1.20.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pull-request-available, stale-assigned, test-stability
> Fix For: 1.14.0, 1.16.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18292&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=a99e99c7-21cd-5a1f-7274-585e62b72f56
> {code}
> May 25 00:56:38 java.lang.AssertionError: 
> May 25 00:56:38 
> May 25 00:56:38 Expected: is ""
> May 25 00:56:38  but: was "The system is out of resources.\nConsult the 
> following stack trace for details."
> May 25 00:56:38   at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:956)
> May 25 00:56:38   at org.junit.Assert.assertThat(Assert.java:923)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.run(ExceptionUtilsITCase.java:94)
> May 25 00:56:38   at 
> org.apache.flink.runtime.util.ExceptionUtilsITCase.testIsMetaspaceOutOfMemoryError(ExceptionUtilsITCase.java:70)
> May 25 00:56:38   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> May 25 00:56:38   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> May 25 00:56:38   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> May 25 00:56:38   at java.lang.reflect.Method.invoke(Method.java:498)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> May 25 00:56:38   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> May 25 00:56:38   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> May 25 00:56:38   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> May 25 00:56:38   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> May 25 00:56:38   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> May 25 00:56:38   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> May 25 00:56:38   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> May 25 00:56:38   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> May 25 00:56:38   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> May 25 00:56:38   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> May 25 00:56:38   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> May 25 00:56:38   at 
> org.apache.maven.surefir

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   > Ok, thank you, actually, I did this function last summer and follow the 
jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description.
   
   I understand that. If at some point we find out we either made mistake or 
did not put enough effort into something it's better to fix that sooner rather 
than later when we need to live with the consequences. I admit I have not 
thoroughly checked the semantics before which I should've.
   
   It's better to do something well rather than fast in my opinion.
   
   I see, so traditional RDBMS do not really support that function. It's also 
worth checking what does:
   * Snowflake: 
https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` when 
any argument is `null`), null handling separately
   * Spark: 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: 
from the docs it does not say what's the behaviour on `null` `ascendingOrder`, 
nulls first on asc, nulls last on desc
   * Presto: https://prestodb.io/docs/current/functions/array.html: has two 
separate functions for `ASC/DESC`
   
   To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-02-14 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489284689


##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerBase.java:
##
@@ -0,0 +1,290 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import com.amazonaws.util.BinaryUtils;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.security.MessageDigest;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SimpleTimeZone;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Common methods and properties for all AWS4 signer variants. */
+public abstract class AWS4SignerBase {
+
+/** SHA256 hash of an empty request body. */
+public static final String EMPTY_BODY_SHA256 =
+"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+
+public static final String UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD";
+
+public static final String SCHEME = "AWS4";
+public static final String ALGORITHM = "HMAC-SHA256";
+public static final String TERMINATOR = "aws4_request";
+
+/** format strings for the date/time and date stamps required during 
signing. */
+public static final String ISO_8601_BASIC_FORMAT = "MMdd'T'HHmmss'Z'";
+
+public static final String DATE_STRING_FORMAT = "MMdd";
+
+protected URL endpointUrl;
+protected String httpMethod;
+protected String serviceName;
+protected String regionName;
+
+protected final SimpleDateFormat dateTimeFormat;
+protected final SimpleDateFormat dateStampFormat;
+
+/**
+ * Create a new AWS V4 signer.
+ *
+ * @param endpointUrl The service endpoint, including the path to any 
resource.
+ * @param httpMethod The HTTP verb for the request, e.g. GET.
+ * @param serviceName The signing name of the service, e.g. 's3'.
+ * @param regionName The system name of the AWS region associated with the 
endpoint, e.g.
+ * us-east-1.
+ */
+public AWS4SignerBase(
+URL endpointUrl, String httpMethod, String serviceName, String 
regionName) {
+this.endpointUrl = endpointUrl;
+this.httpMethod = httpMethod;
+this.serviceName = serviceName;
+this.regionName = regionName;
+
+dateTimeFormat = new SimpleDateFormat(ISO_8601_BASIC_FORMAT);
+dateTimeFormat.setTimeZone(new SimpleTimeZone(0, "UTC"));
+dateStampFormat = new SimpleDateFormat(DATE_STRING_FORMAT);
+dateStampFormat.setTimeZone(new SimpleTimeZone(0, "UTC"));
+}
+
+/**
+ * Returns the canonical collection of header names that will be included 
in the signature. For
+ * AWS4, all header names must be included in the process in sorted 
canonicalized order.
+ */
+protected static String getCanonicalizeHeaderNames(Map 
headers) {
+List sortedHeaders = new ArrayList();
+sortedHeaders.addAll(headers.keySet());
+Collections.sort(sortedHeaders, String.CASE_INSENSITIVE_ORDER);
+
+StringBuilder buffer = new StringBuilder();
+for (String header : sortedHeaders) {
+if (buffer.length() > 0) {
+buffer.append(";");
+}
+buffer.append(header.toLowerCase());
+}
+
+return buffer.toString();
+}
+
+/**
+ * Computes the canonical headers with values for the request. For AWS4, 
all headers must be
+ * included in the signing process.
+ */
+protected static String getCanonicalizedHeaderString(Map 
headers) {
+if (headers == null || headers.isEmpty()) {
+return "";
+}
+
+// step1: sort the headers by case-insensitive order
+List sortedHeaders = new ArrayList();
+sortedHeaders.addAll(headers.keySet());
+Collections.sort(sortedHeaders, String.CASE_INSENSITIVE_ORDER);
+
+// step2: form the

[jira] [Closed] (FLINK-34432) Re-enable forkReuse for flink-table-planner

2024-02-14 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-34432.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

Fixed in apache/flink:master 403694e7b9c213386f3ed9cff21ce2664030ebc2

> Re-enable forkReuse for flink-table-planner
> ---
>
> Key: FLINK-34432
> URL: https://issues.apache.org/jira/browse/FLINK-34432
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Client, Test Infrastructure, Tests
>Affects Versions: 1.19.0, 1.18.2, 1.20.0
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> With FLINK-18356 resolved, we should re-enable forkReuse for 
> flink-table-planner to speed up the tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-02-14 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1489268899


##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AmazonManagedPrometheusWriteRequestSigner.java:
##
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSSessionCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.util.BinaryUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+/** Sign a Remote-Write request to Amazon Managed Service for Prometheus 
(AMP). */
+public class AmazonManagedPrometheusWriteRequestSigner implements 
PrometheusRequestSigner {
+
+private final URL remoteWriteUrl;
+private final String awsRegion;
+
+/**
+ * Constructor.
+ *
+ * @param remoteWriteUrl URL of the remote-write endpoint
+ * @param awsRegion Region of the AMP workspace
+ */
+public AmazonManagedPrometheusWriteRequestSigner(String remoteWriteUrl, 
String awsRegion) {
+Preconditions.checkArgument(
+StringUtils.isNotBlank(awsRegion), "Missing or blank AMP 
workspace region");
+Preconditions.checkNotNull(
+StringUtils.isNotBlank(remoteWriteUrl),
+"Missing or blank AMP workspace remote-write URL");
+this.awsRegion = awsRegion;
+try {
+this.remoteWriteUrl = new URL(remoteWriteUrl);
+} catch (MalformedURLException e) {
+throw new IllegalArgumentException(
+"Invalid AMP remote-write URL: " + remoteWriteUrl, e);
+}
+}
+
+/**
+ * Add the additional Http request headers required by Amazon Managed 
Prometheus:
+ * 'x-amz-content-sha256', 'Host', 'X-Amz-Date', 'x-amz-security-token' 
and 'Authorization`.
+ *
+ * @param requestHeaders original Http request headers. It must be 
mutable. For efficiency, any
+ * new header is added to the map, instead of making a copy.
+ * @param requestBody request body, already compressed
+ */
+@Override
+public void addSignatureHeaders(Map requestHeaders, byte[] 
requestBody) {
+byte[] contentHash = AWS4SignerBase.hash(requestBody);
+String contentHashString = BinaryUtils.toHex(contentHash);
+requestHeaders.put(
+"x-amz-content-sha256",
+contentHashString); // this header must be included before 
generating the
+// Authorization header
+
+DefaultAWSCredentialsProviderChain credsChain = new 
DefaultAWSCredentialsProviderChain();

Review Comment:
   Added the possibility to instantiate a signer with a different credentials 
provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489262074


##
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java:
##
@@ -45,11 +45,13 @@ public MainThreadValidatorUtil(RpcEndpoint endpoint) {
 public void enterMainThread() {
 assert (endpoint.currentMainThread.compareAndSet(null, 
Thread.currentThread()))
 : "The RpcEndpoint has concurrent access from " + 
endpoint.currentMainThread.get();
+endpoint.beforeInvocation();

Review Comment:
   Yes, because the main thread executable covers less; it doesnt capture 
anything sent directly against the rpc service's executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489259793


##
flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.test.appender.ListAppender;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the 
most important cases.
+ */
+public class JobIDLoggingITCase extends TestLogger {
+private static final Logger logger = 
LoggerFactory.getLogger(JobIDLoggingITCase.class);
+
+@ClassRule
+public static MiniClusterWithClientResource miniClusterResource =
+new MiniClusterWithClientResource(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+.build());
+
+@Test
+public void testJobIDLogging() throws Exception {
+LoggerContext ctx = LoggerContext.getContext(false);

Review Comment:
   You can use multiple extensions to capture multiple loggers, see 
`YARNSessionCapacitySchedulerITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489259793


##
flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.test.appender.ListAppender;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the 
most important cases.
+ */
+public class JobIDLoggingITCase extends TestLogger {
+private static final Logger logger = 
LoggerFactory.getLogger(JobIDLoggingITCase.class);
+
+@ClassRule
+public static MiniClusterWithClientResource miniClusterResource =
+new MiniClusterWithClientResource(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+.build());
+
+@Test
+public void testJobIDLogging() throws Exception {
+LoggerContext ctx = LoggerContext.getContext(false);

Review Comment:
   You can use multiple extensions to capture multiple loggers..



##
flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.stre

Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489259263


##
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java:
##
@@ -34,4 +34,16 @@ public interface RpcGateway {
  * @return Fully qualified hostname under which the associated rpc 
endpoint is reachable
  */
 String getHostname();
+
+/**
+ * Perform optional steps before handling any messages or performing 
service actions (e.g.
+ * start/stop).
+ */
+default void beforeInvocation() {}

Review Comment:
   yes, (if this is handled in the PekkoRpcActor). Although maybe not runnables 
but just a Map? Not sure if we _really_ want to open us up to 
running arbitrary things before/after just yet 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489257016


##
flink-core/src/main/java/org/apache/flink/util/JobIdLoggingExecutor.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class JobIdLoggingExecutor implements Executor {
+protected final JobID jobID;

Review Comment:
   cluster id, job name, task id, task name, ...
   
   There's a lot of stuff we could put in the MDC that'd be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32596) The partition key will be wrong when use Flink dialect to create Hive table

2024-02-14 Thread Vallari Rastogi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vallari Rastogi updated FLINK-32596:

Attachment: image-2024-02-14-16-06-13-126.png

> The partition key will be wrong when use Flink dialect to create Hive table
> ---
>
> Key: FLINK-32596
> URL: https://issues.apache.org/jira/browse/FLINK-32596
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: luoyuxia
>Assignee: Vallari Rastogi
>Priority: Major
> Attachments: image-2024-02-14-16-06-13-126.png
>
>
> Can be reproduced by the following SQL:
>  
> {code:java}
> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> tableEnv.executeSql(
> "create table t1(`date` string, `geo_altitude` FLOAT) partitioned by 
> (`date`)"
> + " with ('connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s',  
> 'sink.partition-commit.policy.kind'='metastore,success-file')");
> CatalogTable catalogTable =
> (CatalogTable) 
> hiveCatalog.getTable(ObjectPath.fromString("default.t1"));
> // the following assertion will fail
> assertThat(catalogTable.getPartitionKeys().toString()).isEqualTo("[date]");{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34417] Log Job ID via MDC [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24292:
URL: https://github.com/apache/flink/pull/24292#discussion_r1489255700


##
docs/content.zh/docs/deployment/advanced/logging.md:
##
@@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实
 
 
 
+### Structured logging
+
+Flink adds the following fields to 
[MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log 
messages (experimental feature):
+- Job ID
+- key: `flink-job-id`

Review Comment:
   With something like this?
   ```
   appender.console.json.eventTemplateAdditionalField[0].type = 
EventTemplateAdditionalField
   appender.console.json.eventTemplateAdditionalField[0].key = flink.job.id
   appender.console.json.eventTemplateAdditionalField[0].value = {"$resolver": 
"mdc", "key": "flink-job-id"}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34364) Fix release utils mount point to match the release doc and scripts

2024-02-14 Thread Etienne Chauchot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Etienne Chauchot resolved FLINK-34364.
--
Resolution: Fixed

fixed release utils integration

> Fix release utils mount point to match the release doc and scripts
> --
>
> Key: FLINK-34364
> URL: https://issues.apache.org/jira/browse/FLINK-34364
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Parent, Release System
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> parent_pom branch refers to an incorrect mount point tools/*release*/shared 
> instead of tools/*releasing*/shared for the release_utils. 
> _tools/releasing_/shared is the one used in the release scripts and in the 
> release docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34364) Fix release utils mount point to match the release doc and scripts

2024-02-14 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817321#comment-17817321
 ] 

Etienne Chauchot commented on FLINK-34364:
--

parent_pom: c806d46ef06c8e46d28a6a8b5db3f5104cfe53bc

> Fix release utils mount point to match the release doc and scripts
> --
>
> Key: FLINK-34364
> URL: https://issues.apache.org/jira/browse/FLINK-34364
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Parent, Release System
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> parent_pom branch refers to an incorrect mount point tools/*release*/shared 
> instead of tools/*releasing*/shared for the release_utils. 
> _tools/releasing_/shared is the one used in the release scripts and in the 
> release docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34364] Replace incorrect release_utils mount point tools/release/shared by tools/releasing/shared to match the release doc and scripts [flink-connector-shared-utils]

2024-02-14 Thread via GitHub


echauchot merged PR #36:
URL: https://github.com/apache/flink-connector-shared-utils/pull/36


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817300#comment-17817300
 ] 

Matthias Pohl commented on FLINK-28440:
---

https://github.com/apache/flink/actions/runs/7895502334/job/21548198516#step:10:7557

> EventTimeWindowCheckpointingITCase failed with restore
> --
>
> Key: FLINK-28440
> URL: https://issues.apache.org/jira/browse/FLINK-28440
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.16.0, 1.17.0, 1.18.0, 1.19.0
>Reporter: Huang Xingbo
>Assignee: Yanfei Lei
>Priority: Critical
>  Labels: auto-deprioritized-critical, pull-request-available, 
> stale-assigned, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-02-01-00-51-54-506.png, 
> image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, 
> image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, 
> image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, 
> image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png
>
>
> {code:java}
> Caused by: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
>   ... 11 more
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced
>  (No such file or directory)
>   at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
>   at 
> org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96)
>   at 
> org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75)
>   at 
> org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92)
>   at 
> org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   ... 13 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-03

[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817301#comment-17817301
 ] 

Matthias Pohl commented on FLINK-34418:
---

https://github.com/apache/flink/actions/runs/7895502334

> Disk space issues for Docker-ized GitHub Action jobs
> 
>
> Key: FLINK-34418
> URL: https://issues.apache.org/jira/browse/FLINK-34418
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
>
> [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746]
> {code:java}
> [...]
> Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device
> 27608Feb 09 03:00:13  at java.io.FileOutputStream.writeBytes(Native Method)
> 27609Feb 09 03:00:13  at 
> java.io.FileOutputStream.write(FileOutputStream.java:326)
> 27610Feb 09 03:00:13  at 
> org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250)
> 27611Feb 09 03:00:13  ... 39 more
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817273#comment-17817273
 ] 

Matthias Pohl edited comment on FLINK-34403 at 2/14/24 9:59 AM:


Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548207280#step:10:23089

Reopening the issue.


was (Author: mapohl):
Args, all the time I didn't notice that they are two separate tests (with very 
similar names):
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862
* 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57499&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461&l=23862

Reopening the issue.

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.20.0
>
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37)
> Feb 07 05:43:21   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 07 05:43:21 Caused by: java.util.concur

[jira] [Comment Edited] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298
 ] 

Matthias Pohl edited comment on FLINK-34336 at 2/14/24 9:59 AM:


* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193
* 
https://github.com/apache/flink/actions/runs/7895502334/job/21548208160#step:10:11190


was (Author: mapohl):
https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193

> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang sometimes
> -
>
> Key: FLINK-34336
> URL: https://issues.apache.org/jira/browse/FLINK-34336
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang in 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color}
> h2. Reason:
> The job has 2 tasks(vertices), after calling updateJobResourceRequirements. 
> The source parallelism isn't changed (It's parallelism) , and the 
> FlatMapper+Sink is changed from  parallelism to parallelism2.
> So we expect the task number should be parallelism + parallelism2 instead of 
> parallelism2.
>  
> h2. Why it can be passed for now?
> Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by 
> default. It means, flink job will rescale job 30 seconds after 
> updateJobResourceRequirements is called.
>  
> So the running tasks are old parallelism when we call 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color}
> IIUC, it cannot be guaranteed, and it's unexpected.
>  
> h2. How to reproduce this bug?
> [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6]
>  * Disable the cooldown
>  * Sleep for a while before waitForRunningTasks
> If so, the job running in new parallelism, so `waitForRunningTasks` will hang 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes

2024-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34336:
--
Labels: pull-request-available test-stability  (was: pull-request-available)

> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang sometimes
> -
>
> Key: FLINK-34336
> URL: https://issues.apache.org/jira/browse/FLINK-34336
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang in 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color}
> h2. Reason:
> The job has 2 tasks(vertices), after calling updateJobResourceRequirements. 
> The source parallelism isn't changed (It's parallelism) , and the 
> FlatMapper+Sink is changed from  parallelism to parallelism2.
> So we expect the task number should be parallelism + parallelism2 instead of 
> parallelism2.
>  
> h2. Why it can be passed for now?
> Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by 
> default. It means, flink job will rescale job 30 seconds after 
> updateJobResourceRequirements is called.
>  
> So the running tasks are old parallelism when we call 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color}
> IIUC, it cannot be guaranteed, and it's unexpected.
>  
> h2. How to reproduce this bug?
> [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6]
>  * Disable the cooldown
>  * Sleep for a while before waitForRunningTasks
> If so, the job running in new parallelism, so `waitForRunningTasks` will hang 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes

2024-02-14 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34336:
--
Affects Version/s: 1.20.0

> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang sometimes
> -
>
> Key: FLINK-34336
> URL: https://issues.apache.org/jira/browse/FLINK-34336
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang in 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color}
> h2. Reason:
> The job has 2 tasks(vertices), after calling updateJobResourceRequirements. 
> The source parallelism isn't changed (It's parallelism) , and the 
> FlatMapper+Sink is changed from  parallelism to parallelism2.
> So we expect the task number should be parallelism + parallelism2 instead of 
> parallelism2.
>  
> h2. Why it can be passed for now?
> Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by 
> default. It means, flink job will rescale job 30 seconds after 
> updateJobResourceRequirements is called.
>  
> So the running tasks are old parallelism when we call 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color}
> IIUC, it cannot be guaranteed, and it's unexpected.
>  
> h2. How to reproduce this bug?
> [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6]
>  * Disable the cooldown
>  * Sleep for a while before waitForRunningTasks
> If so, the job running in new parallelism, so `waitForRunningTasks` will hang 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34336) AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState may hang sometimes

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817298#comment-17817298
 ] 

Matthias Pohl commented on FLINK-34336:
---

https://github.com/apache/flink/actions/runs/7895502334/job/21548185872#step:10:10193

> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang sometimes
> -
>
> Key: FLINK-34336
> URL: https://issues.apache.org/jira/browse/FLINK-34336
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.19.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> AutoRescalingITCase#testCheckpointRescalingWithKeyedAndNonPartitionedState 
> may hang in 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};{color}
> h2. Reason:
> The job has 2 tasks(vertices), after calling updateJobResourceRequirements. 
> The source parallelism isn't changed (It's parallelism) , and the 
> FlatMapper+Sink is changed from  parallelism to parallelism2.
> So we expect the task number should be parallelism + parallelism2 instead of 
> parallelism2.
>  
> h2. Why it can be passed for now?
> Flink 1.19 supports the scaling cooldown, and the cooldown time is 30s by 
> default. It means, flink job will rescale job 30 seconds after 
> updateJobResourceRequirements is called.
>  
> So the running tasks are old parallelism when we call 
> waitForRunningTasks({color:#9876aa}restClusterClient{color}{color:#cc7832}, 
> {color}jobID{color:#cc7832}, {color}parallelism2){color:#cc7832};. {color}
> IIUC, it cannot be guaranteed, and it's unexpected.
>  
> h2. How to reproduce this bug?
> [https://github.com/1996fanrui/flink/commit/ffd713e24d37db2c103e4cd4361d0cd916d0d2f6]
>  * Disable the cooldown
>  * Sleep for a while before waitForRunningTasks
> If so, the job running in new parallelism, so `waitForRunningTasks` will hang 
> forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817295#comment-17817295
 ] 

Matthias Pohl commented on FLINK-34418:
---

https://github.com/apache/flink/actions/runs/7895502322/job/21548178211

> Disk space issues for Docker-ized GitHub Action jobs
> 
>
> Key: FLINK-34418
> URL: https://issues.apache.org/jira/browse/FLINK-34418
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
>
> [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746]
> {code:java}
> [...]
> Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device
> 27608Feb 09 03:00:13  at java.io.FileOutputStream.writeBytes(Native Method)
> 27609Feb 09 03:00:13  at 
> java.io.FileOutputStream.write(FileOutputStream.java:326)
> 27610Feb 09 03:00:13  at 
> org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250)
> 27611Feb 09 03:00:13  ... 39 more
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34418) Disk space issues for Docker-ized GitHub Action jobs

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817292#comment-17817292
 ] 

Matthias Pohl commented on FLINK-34418:
---

https://github.com/apache/flink/actions/runs/7895502206/job/21548178104

> Disk space issues for Docker-ized GitHub Action jobs
> 
>
> Key: FLINK-34418
> URL: https://issues.apache.org/jira/browse/FLINK-34418
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
>
> [https://github.com/apache/flink/actions/runs/7838691874/job/21390739806#step:10:27746]
> {code:java}
> [...]
> Feb 09 03:00:13 Caused by: java.io.IOException: No space left on device
> 27608Feb 09 03:00:13  at java.io.FileOutputStream.writeBytes(Native Method)
> 27609Feb 09 03:00:13  at 
> java.io.FileOutputStream.write(FileOutputStream.java:326)
> 27610Feb 09 03:00:13  at 
> org.apache.logging.log4j.core.appender.OutputStreamManager.writeToDestination(OutputStreamManager.java:250)
> 27611Feb 09 03:00:13  ... 39 more
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34443) YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed when deploying job cluster

2024-02-14 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817293#comment-17817293
 ] 

Matthias Pohl commented on FLINK-34443:
---

Maybe related to FLINK-34418

> YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed 
> when deploying job cluster
> ---
>
> Key: FLINK-34443
> URL: https://issues.apache.org/jira/browse/FLINK-34443
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Runtime / Coordination, Test 
> Infrastructure
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> https://github.com/apache/flink/actions/runs/7895502206/job/21548246199#step:10:28804
> {code}
> Error: 03:04:05 03:04:05.066 [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 68.10 s <<< FAILURE! -- in 
> org.apache.flink.yarn.YARNFileReplicationITCase
> Error: 03:04:05 03:04:05.067 [ERROR] 
> org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication
>  -- Time elapsed: 1.982 s <<< ERROR!
> Feb 14 03:04:05 
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not 
> deploy Yarn job cluster.
> Feb 14 03:04:05   at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:566)
> Feb 14 03:04:05   at 
> org.apache.flink.yarn.YARNFileReplicationITCase.deployPerJob(YARNFileReplicationITCase.java:109)
> Feb 14 03:04:05   at 
> org.apache.flink.yarn.YARNFileReplicationITCase.lambda$testPerJobModeWithCustomizedFileReplication$0(YARNFileReplicationITCase.java:73)
> Feb 14 03:04:05   at 
> org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
> Feb 14 03:04:05   at 
> org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication(YARNFileReplicationITCase.java:73)
> Feb 14 03:04:05   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 14 03:04:05   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Feb 14 03:04:05   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Feb 14 03:04:05   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Feb 14 03:04:05   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Feb 14 03:04:05   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Feb 14 03:04:05 Caused by: 
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
> /user/root/.flink/application_1707879779446_0002/log4j-api-2.17.1.jar could 
> only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
> running and 2 node(s) are excluded in this operation.
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
> Feb 14 03:04:05   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
> Feb 14 03:04:05   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
> Feb 14 03:04:05   at java.security.AccessController.doPrivileged(Native 
> Method)
> Feb 14 03:04:05   at javax.security.auth.Subject.doAs(Subject.java:422)
> Feb 14 03:04:05   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
> Feb 14 03:04:05 
> Feb 14 03:04:05   at 
> org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1579)
> Feb 14 03:04:05   at org.apache.hadoop.ipc.Client.call(Client.java:1525)

[jira] [Created] (FLINK-34443) YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed when deploying job cluster

2024-02-14 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34443:
-

 Summary: 
YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication failed 
when deploying job cluster
 Key: FLINK-34443
 URL: https://issues.apache.org/jira/browse/FLINK-34443
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI, Runtime / Coordination, Test 
Infrastructure
Affects Versions: 1.19.0, 1.20.0
Reporter: Matthias Pohl


https://github.com/apache/flink/actions/runs/7895502206/job/21548246199#step:10:28804

{code}
Error: 03:04:05 03:04:05.066 [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 68.10 s <<< FAILURE! -- in 
org.apache.flink.yarn.YARNFileReplicationITCase
Error: 03:04:05 03:04:05.067 [ERROR] 
org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication
 -- Time elapsed: 1.982 s <<< ERROR!
Feb 14 03:04:05 org.apache.flink.client.deployment.ClusterDeploymentException: 
Could not deploy Yarn job cluster.
Feb 14 03:04:05 at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:566)
Feb 14 03:04:05 at 
org.apache.flink.yarn.YARNFileReplicationITCase.deployPerJob(YARNFileReplicationITCase.java:109)
Feb 14 03:04:05 at 
org.apache.flink.yarn.YARNFileReplicationITCase.lambda$testPerJobModeWithCustomizedFileReplication$0(YARNFileReplicationITCase.java:73)
Feb 14 03:04:05 at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
Feb 14 03:04:05 at 
org.apache.flink.yarn.YARNFileReplicationITCase.testPerJobModeWithCustomizedFileReplication(YARNFileReplicationITCase.java:73)
Feb 14 03:04:05 at java.lang.reflect.Method.invoke(Method.java:498)
Feb 14 03:04:05 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Feb 14 03:04:05 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Feb 14 03:04:05 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Feb 14 03:04:05 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Feb 14 03:04:05 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Feb 14 03:04:05 Caused by: 
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/user/root/.flink/application_1707879779446_0002/log4j-api-2.17.1.jar could 
only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) 
running and 2 node(s) are excluded in this operation.
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
Feb 14 03:04:05 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
Feb 14 03:04:05 at java.security.AccessController.doPrivileged(Native 
Method)
Feb 14 03:04:05 at javax.security.auth.Subject.doAs(Subject.java:422)
Feb 14 03:04:05 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)
Feb 14 03:04:05 
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1579)
Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.call(Client.java:1525)
Feb 14 03:04:05 at org.apache.hadoop.ipc.Client.call(Client.java:1422)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)
Feb 14 03:04:05 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
Feb 14 03:04:05 at com.sun.proxy.$Proxy113.addBlock(Unknown Source)
Feb 14 03:04:05 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.ad

[jira] [Assigned] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources

2024-02-14 Thread Jing Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge reassigned FLINK-34442:
---

Assignee: Jeyhun Karimov

> Support optimizations for pre-partitioned [external] data sources
> -
>
> Key: FLINK-34442
> URL: https://issues.apache.org/jira/browse/FLINK-34442
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Jeyhun Karimov
>Assignee: Jeyhun Karimov
>Priority: Major
>
> There are some use-cases in which data sources are pre-partitioned:
> - Kafka broker is already partitioned w.r.t. some key[s]
> - There are multiple [Flink] jobs  that materialize their outputs and read 
> them as input subsequently
> One of the main benefits is that we might avoid unnecessary shuffling. 
> There is already an experimental feature in DataStream to support a subset of 
> these [1].
> We should support this for Flink Table/SQL as well. 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]

2024-02-14 Thread via GitHub


zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489188467


##
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class 
taskClazz) {
 public void triggerPeriodicScheduledTasks() {
 for (ScheduledTask scheduledTask : periodicScheduledTasks) {
 if (!scheduledTask.isCancelled()) {
-scheduledTask.execute();
+executeScheduledTask(scheduledTask);
 }
 }
 }
 
+private static void executeScheduledTask(ScheduledTask scheduledTask) {
+scheduledTask.execute();
+try {
+// try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+// occurred
+scheduledTask.get();

Review Comment:
   This doesn't work for periodic tasks since the result future in the 
ScheduledTask never gets completed.
   
   I'm not sure if this change is correct in the first place. Every 
non-periodic task already throws exceptions when trigger is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >