Re: [PR] [FLINK-33936][table] Outputting Identical Results in Mini-Batch Aggregation with Set TTL [flink]
hackergin commented on code in PR #24290: URL: https://github.com/apache/flink/pull/24290#discussion_r1488851119 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala: ## @@ -220,6 +225,63 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode testHarness.close() } + @TestTemplate + def testGlobalAggregateWithRetraction(): Unit = { +if (!this.miniBatch.on) { + return +} +val data = new mutable.MutableList[(String, String, Long)] +val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) +tEnv.createTemporaryView("T", t) + +val sql = + """ +|SELECT a, SUM(c) +|FROM ( +| SELECT a, b, SUM(c) as c +| FROM T GROUP BY a, b +|)GROUP BY a + """.stripMargin +val t1 = tEnv.sqlQuery(sql) + +tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) +tEnv.getConfig.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") Review Comment: No, I made some change in `testAggregateWithRetraction` to cover this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1488733497 ## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java: ## @@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class taskClazz) { public void triggerPeriodicScheduledTasks() { for (ScheduledTask scheduledTask : periodicScheduledTasks) { if (!scheduledTask.isCancelled()) { -scheduledTask.execute(); +executeScheduledTask(scheduledTask); } } } +private static void executeScheduledTask(ScheduledTask scheduledTask) { +scheduledTask.execute(); +try { +// try to retrieve result of scheduled task to avoid swallowing any exceptions that +// occurred +scheduledTask.get(); Review Comment: This blocking call breaks a lot of stuff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1488733497 ## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java: ## @@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class taskClazz) { public void triggerPeriodicScheduledTasks() { for (ScheduledTask scheduledTask : periodicScheduledTasks) { if (!scheduledTask.isCancelled()) { -scheduledTask.execute(); +executeScheduledTask(scheduledTask); } } } +private static void executeScheduledTask(ScheduledTask scheduledTask) { +scheduledTask.execute(); +try { +// try to retrieve result of scheduled task to avoid swallowing any exceptions that +// occurred +scheduledTask.get(); Review Comment: This blocking call breaks a lot of stuff. I don't think we can make this change; too many tests rely on throwing something in the executor, and later triggering the completion once some condition is fulfilled. Maybe handle exceptions as fatal errors tho. Maybe that breaks fewer things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1488731829 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ## @@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() { if (requirementsCheckDelay.toMillis() <= 0) { checkResourceRequirements(); } else { -if (requirementsCheckFuture == null || requirementsCheckFuture.isDone()) { -requirementsCheckFuture = new CompletableFuture<>(); -scheduledExecutor.schedule( -() -> -mainThreadExecutor.execute( -() -> { -checkResourceRequirements(); - Preconditions.checkNotNull(requirementsCheckFuture) -.complete(null); -}), -requirementsCheckDelay.toMillis(), -TimeUnit.MILLISECONDS); +if (requirementsCheckFuture.isDone()) { +requirementsCheckFuture = +scheduledExecutor.schedule( +() -> { +if (started) { Review Comment: This read is unsafe since `started` isn't volatile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34386][state] Add RocksDB bloom filter metrics [flink]
hejufang commented on PR #24274: URL: https://github.com/apache/flink/pull/24274#issuecomment-1942857042 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
[ https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817136#comment-17817136 ] Mason Chen commented on FLINK-33545: Hi [~aeolus811tw], have you had time to review the followup feedback? As mentioned before there's no guarantee that a second commit would succeed and it would possibly need multiple. The best way to do this is by throwing an exception and allowing Flink to restart and try again > KafkaSink implementation can cause dataloss during broker issue when not > using EXACTLY_ONCE if there's any batching > --- > > Key: FLINK-33545 > URL: https://issues.apache.org/jira/browse/FLINK-33545 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.18.0 >Reporter: Kevin Tseng >Assignee: Kevin Tseng >Priority: Major > Labels: pull-request-available > > In the current implementation of KafkaSource and KafkaSink there are some > assumption that were made: > # KafkaSource completely relies on Checkpoint to manage and track its offset > in *KafkaSourceReader* class > # KafkaSink in *KafkaWriter* class only performs catch-flush when > *DeliveryGuarantee.EXACTLY_ONCE* is specified. > KafkaSource is assuming that checkpoint should be properly fenced and > everything it had read up-til checkpoint being initiated will be processed or > recorded by operators downstream, including the TwoPhaseCommiter such as > *KafkaSink* > *KafkaSink* goes by the model of: > > {code:java} > flush -> prepareCommit -> commit{code} > > In a scenario that: > * KafkaSource ingested records #1 to #100 > * KafkaSink only had chance to send records #1 to #96 > * with a batching interval of 5ms > when checkpoint has been initiated, flush will only confirm the sending of > record #1 to #96. > This allows checkpoint to proceed as there's no error, and record #97 to 100 > will be batched after first flush. > Now, if broker goes down / has issue that caused the internal KafkaProducer > to not be able to send out the record after a batch, and is on a constant > retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), > *WriterCallback* error handling will never be triggered until the next > checkpoint flush. > This can be tested by creating a faulty Kafka cluster and run the following > code: > {code:java} > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); > props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer"); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > final KafkaProducer producer = new KafkaProducer<>(props); > try { > for (int i = 0; i < 10; i++) { > System.out.printf("sending record #%d\n", i); > String data = UUID.randomUUID().toString(); > final ProducerRecord record = new > ProducerRecord<>(TOPIC, Integer.toString(i), data); > producer.send(record, new CB(Integer.toString(i), data)); > Thread.sleep(1); //sleep for 10 seconds > } > } catch (Exception e) { > e.printStackTrace(); > } finally { > System.out.println("flushing"); > producer.flush(); > System.out.println("closing"); > producer.close(); > }{code} > Once callback returns due to network timeout, it will cause Flink to restart > from previously saved checkpoint (which recorded reading up to record #100), > but KafkaWriter never sent record #97 to #100. > This will result in dataloss of record #97 to #100 > Because KafkaWriter only catches error *after* callback, if callback is never > invoked (due to broker issue) right after the first flush has taken place, > those records are effectively gone unless someone decided to go back and look > for it. > This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but > is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}. > There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}. > prepareCommit will produce a list of KafkaCommittable that corresponds to > Transactional KafkaProducer to be committed. And a catch up flush will take > place during *commit* step. Whether this was intentional or not, due to the > fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the > end of EXACTLY_ONCE actually ensured everything fenced in the current > checkpoint will be sent to Kafka, or fail the
[jira] [Commented] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817115#comment-17817115 ] Sai Sharath Dandi commented on FLINK-34403: --- Sorry, I couldn't get back here in time. Thanks [~mapohl] for the fix! > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.20.0 > > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37) > Feb 07 05:43:21 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Self-suppression not permitted > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323) > Feb 07 05:43:21 ... 18 more > Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: > Self-suppression not permitted > Feb 07 05:43:21 at > java.lang.Throwable.addSuppressed(Throwable.java:1072) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:486) > Feb 07 05:43:21 at >
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488432583 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { +/** A single Label. */ +public static class Label implements Serializable { +private final String name; +private final String value; + +public Label(String name, String value) { Review Comment: There other other validations that are more critical, like Labels must be in order by name and samples in order by timestamp, or Prometheus will reject them. We kept these validations out of the implementation on purpose, because this object will be called a gazillion times, and this may impact performance. In the docs, we explicitly say this is a responsibility of the application, like sending the time series in time order. Either we decide to add validation to this class, paying the performance fee, or we leave this class unvalidated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488419176 ## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PrometheusStateSerializerTest { + +private static final ElementConverter +ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter(); + +private static PrometheusTimeSeries getTestTimeSeries(int i) { +return PrometheusTimeSeries.builder() +.withMetricName("metric-name") +.addLabel("dimension-a", "value-" + i) +.addSample(i + 42.0, i + 1L) +.addSample(i + 3.14, i + 2L) +.build(); +} + +// This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract +// the requestEntry "size" +// (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper +// see +// https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60 Review Comment: Replaced with @link to the method. But I am not convinced it makes sense: the link was to a specific source code to show how the size is used in the implementation of a method, not to the JavaDoc of that method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488419176 ## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PrometheusStateSerializerTest { + +private static final ElementConverter +ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter(); + +private static PrometheusTimeSeries getTestTimeSeries(int i) { +return PrometheusTimeSeries.builder() +.withMetricName("metric-name") +.addLabel("dimension-a", "value-" + i) +.addSample(i + 42.0, i + 1L) +.addSample(i + 3.14, i + 2L) +.build(); +} + +// This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract +// the requestEntry "size" +// (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper +// see +// https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60 Review Comment: Replaced with @link to the method. This way we lose the pointer to the specific line, but okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1488414914 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java: ## @@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT()) .expectArgumentTypes( DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(), DataTypes.INT()), + TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.expectSignature("f(>)") + .calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW())) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), +TestSpec.forStrategy( +"Strategy fails if input argument type is not ARRAY", + sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.calledWithArgumentTypes(DataTypes.INT()) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), +TestSpec.forStrategy( +"Strategy fails if the number of input arguments are not one", + sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.calledWithArgumentTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.STRING())) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), Review Comment: Ok, I will remove this test case. Thank you for your explain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
hanyuzheng7 commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1488411926 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: Ok, waiting for @MartijnVisser reply. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488409080 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/RequestEntrySizeUtils.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.util.Collection; + +/** Collection of methods to calculate the sink RequestEntry "size". */ +public class RequestEntrySizeUtils { + +/** + * Size of a request entry (a {@link Types.TimeSeries time-series}) for the purpose of batching. + * Count the number of {@link Types.Sample samples} + * + * @param requestEntry a time-series + * @return number of Samples in the TimeSeries + */ +public static long requestSizeForBatching(Types.TimeSeries requestEntry) { +return requestEntry.getSamplesCount(); +} + +/** + * Serialized size of a request entry {@link Types.TimeSeries TimeSeries}: the number of bytes + * of the protobuf- serialized representation of the TimeSeries. + * + * @param requestEntry a time-series + * @return number of bytes + */ +public static long requestSerializedSize(Types.TimeSeries requestEntry) { +return requestEntry.getSerializedSize(); +} + +/** + * Count the number of {@link Types.Sample samples} in a collection of {@link Types.TimeSeries + * time-series} (a batch). + * + * @param requestEntries collection of time-series + * @return number of samples + */ +public static long countSamples(Collection requestEntries) { +return requestEntries.stream() +.mapToLong(RequestEntrySizeUtils::requestSizeForBatching) +.sum(); +} Review Comment: Added tests for RequestEntrySizeUtilsTest methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources
[ https://issues.apache.org/jira/browse/FLINK-34442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeyhun Karimov updated FLINK-34442: --- Description: There are some use-cases in which data sources are pre-partitioned: - Kafka broker is already partitioned w.r.t. some key[s] - There are multiple [Flink] jobs that materialize their outputs and read them as input subsequently One of the main benefits is that we might avoid unnecessary shuffling. There is already an experimental feature in DataStream to support a subset of these [1]. We should support this for Flink Table/SQL as well. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ was: There are some use-cases in which data sources are pre-partitioned: - Kafka broker is already partitioned w.r.t. some key - There are multiple Flink jobs that materialize their outputs and read them as input subsequently One of the main benefits is that we might avoid unnecessary shuffling. There is already an experimental feature in DataStream to support a subset of these [1]. We should support this for Flink Table/SQL as well. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > Support optimizations for pre-partitioned [external] data sources > - > > Key: FLINK-34442 > URL: https://issues.apache.org/jira/browse/FLINK-34442 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Jeyhun Karimov >Priority: Major > > There are some use-cases in which data sources are pre-partitioned: > - Kafka broker is already partitioned w.r.t. some key[s] > - There are multiple [Flink] jobs that materialize their outputs and read > them as input subsequently > One of the main benefits is that we might avoid unnecessary shuffling. > There is already an experimental feature in DataStream to support a subset of > these [1]. > We should support this for Flink Table/SQL as well. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34442) Support optimizations for pre-partitioned [external] data sources
Jeyhun Karimov created FLINK-34442: -- Summary: Support optimizations for pre-partitioned [external] data sources Key: FLINK-34442 URL: https://issues.apache.org/jira/browse/FLINK-34442 Project: Flink Issue Type: Improvement Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.18.1 Reporter: Jeyhun Karimov There are some use-cases in which data sources are pre-partitioned: - Kafka broker is already partitioned w.r.t. some key - There are multiple Flink jobs that materialize their outputs and read them as input subsequently One of the main benefits is that we might avoid unnecessary shuffling. There is already an experimental feature in DataStream to support a subset of these [1]. We should support this for Flink Table/SQL as well. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488374236 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { +/** A single Label. */ +public static class Label implements Serializable { +private final String name; +private final String value; + +public Label(String name, String value) { +this.name = name; +this.value = value; +} + +public String getName() { +return name; +} + +public String getValue() { +return value; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +Label label = (Label) o; +return new EqualsBuilder() +.append(name, label.name) +.append(value, label.value) +.isEquals(); +} + +@Override +public int hashCode() { +return new HashCodeBuilder(17, 37).append(name).append(value).toHashCode(); +} +} + +/** A single Sample. */ +public static class Sample implements Serializable { +private final double value; +private final long timestamp; + +public Sample(double value, long timestamp) { +this.value = value; +this.timestamp = timestamp; +} + +public double getValue() { +return value; +} + +public long getTimestamp() { +return timestamp; +} +} + +private final Label[] labels; +private final Sample[] samples; +private final String metricName; + +public PrometheusTimeSeries(String metricName, Label[] labels, Sample[] samples) { +this.metricName = metricName; +this.labels = labels; +this.samples = samples; +} + +public Label[] getLabels() { +return labels; +} + +public Sample[] getSamples() { +return samples; +} + +public String getMetricName() { +return metricName; +} + +public static Builder builder() { +return new Builder(); +} + +public static Builder builderFrom(PrometheusTimeSeries other) { +return new Builder( +Arrays.asList(other.labels), Arrays.asList(other.samples), other.metricName); +} + +/** Builder for sink input pojo instance. */ +public static final class Builder { +private List labels = new ArrayList<>(); +private List samples = new ArrayList<>(); +private String metricName; + +private Builder(List labels, List samples, String metricName) { +this.labels = labels; +this.samples = samples; +this.metricName = metricName; +} Review Comment: I assume you mean testing the builder, not its constructor. Added a test for PrometheusTimeSeries.Builder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1488349352 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { +/** A single Label. */ +public static class Label implements Serializable { +private final String name; +private final String value; + +public Label(String name, String value) { +this.name = name; +this.value = value; +} + +public String getName() { +return name; +} + +public String getValue() { +return value; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +Label label = (Label) o; +return new EqualsBuilder() +.append(name, label.name) +.append(value, label.value) +.isEquals(); +} Review Comment: Added unit tests on equals and hashCode semantics, and also fixed a bug: missing equals() in one of the inner classes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]
prakash-42 commented on PR #776: URL: https://github.com/apache/flink-kubernetes-operator/pull/776#issuecomment-1942031569 @mxm Here is a PR for the discussion we were having at PR #596 . Please let me know if any updates/improvements are needed. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34441) Add Documentation for flink-sql-runner-example in Kubernetes Operator Documentation
[ https://issues.apache.org/jira/browse/FLINK-34441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34441: --- Labels: pull-request-available (was: ) > Add Documentation for flink-sql-runner-example in Kubernetes Operator > Documentation > --- > > Key: FLINK-34441 > URL: https://issues.apache.org/jira/browse/FLINK-34441 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Prakash Tiwari >Priority: Major > Labels: pull-request-available > > There isn't a direct way available to submit SQL script based jobs to the > Flink Kubernetes Operator. So we have created a > [flink-sql-runner-example|https://github.com/apache/flink-kubernetes-operator/tree/release-1.7/examples/flink-sql-runner-example] > that helps to run Flink SQL scripts as table API jobs. I believe it's a very > useful and important example, and information about this job is missing from > the Kubernetes Operator's documentation. Hence I've created this issue to > update the documentation to include this example. > The prospect for this issue was discussed here: > [https://github.com/apache/flink-kubernetes-operator/pull/596] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34441] Add guide to submit flink SQL scripts via the operator (using flink-sql-runner-example) [flink-kubernetes-operator]
prakash-42 opened a new pull request, #776: URL: https://github.com/apache/flink-kubernetes-operator/pull/776 ## What is the purpose of the change This pull request adds documentation about flink-sql-runner-example in the Kubernetes operator documentation page. Majority of the content is copied over from the [flink-sql-runner-example's readme](https://github.com/apache/flink-kubernetes-operator/tree/release-1.7/examples/flink-sql-runner-example#flink-kubernetes-operator-sql-example). ## Brief change log - Added guide to submit flink SQL scripts via the kubernetes operator (using flink-sql-runner-example) ## Verifying this change *This change is a trivial rework / code cleanup without any test coverage.* I don't know if we have tests covering the docs project. I ran the project locally and made sure that my changes are being rendered properly. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817083#comment-17817083 ] Piotr Nowicki commented on FLINK-34424: --- Yes, we did! A week ago was an anniversary of the last mention :D No problem and good luck with concurrency issues :) > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817082#comment-17817082 ] Matthias Pohl commented on FLINK-34424: --- args. Didn't we have this in the past? Sorry again - the auto completion and the guy behind the screen are to blame here. Yes, you're right. > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34439) Move chown operations to COPY commands in Dockerfile
[ https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34439: --- Labels: pull-request-available (was: ) > Move chown operations to COPY commands in Dockerfile > > > Key: FLINK-34439 > URL: https://issues.apache.org/jira/browse/FLINK-34439 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Mate Czagany >Assignee: Mate Czagany >Priority: Minor > Labels: pull-request-available > > We can lower the size of the output operator container image if we don't run > 'chown' commands in seperate RUN commands inside the Dockerfile, but instead > use the '--chown' argument of the COPY command. > Using 'RUN chown...' will copy all the files affected with their whole size > to a new layer, duplicating the previous files from the COPY command. > Example: > {code:java} > $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8 > ... > 3 months ago RUN /bin/sh -c chown -R flink:flink $FLINK... > 116MB buildkit.dockerfile.v0 > ... {code} > This would mean a 20% reduction in image size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34439] Move chown operations to COPY commands in Dockerfile [flink-kubernetes-operator]
mateczagany opened a new pull request, #775: URL: https://github.com/apache/flink-kubernetes-operator/pull/775 ## What is the purpose of the change Removed unnecessary `RUN chown ...` commands from Dockerfile to reduce output size image. With this change, the size of the image has been reduced from 469MB to 360MB. A side-effect of this PR will be that the `NOTICE` file and `licenses` folder will be owned by `flink:flink` instead of `root:root` as well now, but I think that's acceptable. ## Brief change log - Remove most `RUN chown ...` commands from Dockerfile ## Verifying this change - Built the image, ran some example deployments - Inspected the images file structure before and after and verified that all permissions are the same ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34440) Support Debezium Protobuf Confluent Format
[ https://issues.apache.org/jira/browse/FLINK-34440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817076#comment-17817076 ] Kevin Lam edited comment on FLINK-34440 at 2/13/24 5:00 PM: Hello! I just added this Jira issue, following the [Contribution Guide.|https://flink.apache.org/how-to-contribute/contribute-code/] If this ticket warrants a dev@ discussion, I'm happy to open one, please let me know. Also, I am happy to work on contributing the code to complete this issue. Looking forward to hearing others' thoughts! was (Author: klam-shop): I just added this Jira issue, following the [Contribution Guide.|https://flink.apache.org/how-to-contribute/contribute-code/] If this ticket warrants a dev@ discussion, I'm happy to open one. I am happy to work on contributing the code to complete this issue. Looking forward to hearing others' thoughts! > Support Debezium Protobuf Confluent Format > -- > > Key: FLINK-34440 > URL: https://issues.apache.org/jira/browse/FLINK-34440 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.19.0, 1.18.1 >Reporter: Kevin Lam >Priority: Minor > > *Motivation* > Debezium and the Confluent Schema registry can be used to emit Protobuf > Encoded messages to Kafka, but Flink does not easily support consuming these > messages through a connector. > *Definition of Done* > Add a format `debezium-protobuf-confluent` provided by > DebeziumProtobufFormatFactory that supports Debezium messages encoded using > Protocol Buffer and the Confluent Schema Registry. > To consider > * Mirror the implementation of the `debezium-avro-confluent` format. First > implement a `protobuf-confluent` format similar to the existing [Confluent > Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/] > format that's provided today, which allows reading/writing protobuf using > the Confluent Schema Registry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34441) Add Documentation for flink-sql-runner-example in Kubernetes Operator Documentation
Prakash Tiwari created FLINK-34441: -- Summary: Add Documentation for flink-sql-runner-example in Kubernetes Operator Documentation Key: FLINK-34441 URL: https://issues.apache.org/jira/browse/FLINK-34441 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Prakash Tiwari There isn't a direct way available to submit SQL script based jobs to the Flink Kubernetes Operator. So we have created a [flink-sql-runner-example|https://github.com/apache/flink-kubernetes-operator/tree/release-1.7/examples/flink-sql-runner-example] that helps to run Flink SQL scripts as table API jobs. I believe it's a very useful and important example, and information about this job is missing from the Kubernetes Operator's documentation. Hence I've created this issue to update the documentation to include this example. The prospect for this issue was discussed here: [https://github.com/apache/flink-kubernetes-operator/pull/596] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34440) Support Debezium Protobuf Confluent Format
[ https://issues.apache.org/jira/browse/FLINK-34440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817076#comment-17817076 ] Kevin Lam commented on FLINK-34440: --- I just added this Jira issue, following the [Contribution Guide.|https://flink.apache.org/how-to-contribute/contribute-code/] If this ticket warrants a dev@ discussion, I'm happy to open one. I am happy to work on contributing the code to complete this issue. Looking forward to hearing others' thoughts! > Support Debezium Protobuf Confluent Format > -- > > Key: FLINK-34440 > URL: https://issues.apache.org/jira/browse/FLINK-34440 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.19.0, 1.18.1 >Reporter: Kevin Lam >Priority: Minor > > *Motivation* > Debezium and the Confluent Schema registry can be used to emit Protobuf > Encoded messages to Kafka, but Flink does not easily support consuming these > messages through a connector. > *Definition of Done* > Add a format `debezium-protobuf-confluent` provided by > DebeziumProtobufFormatFactory that supports Debezium messages encoded using > Protocol Buffer and the Confluent Schema Registry. > To consider > * Mirror the implementation of the `debezium-avro-confluent` format. First > implement a `protobuf-confluent` format similar to the existing [Confluent > Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/] > format that's provided today, which allows reading/writing protobuf using > the Confluent Schema Registry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34440) Support Debezium Protobuf Confluent Format
Kevin Lam created FLINK-34440: - Summary: Support Debezium Protobuf Confluent Format Key: FLINK-34440 URL: https://issues.apache.org/jira/browse/FLINK-34440 Project: Flink Issue Type: New Feature Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.18.1, 1.19.0 Reporter: Kevin Lam *Motivation* Debezium and the Confluent Schema registry can be used to emit Protobuf Encoded messages to Kafka, but Flink does not easily support consuming these messages through a connector. *Definition of Done* Add a format `debezium-protobuf-confluent` provided by DebeziumProtobufFormatFactory that supports Debezium messages encoded using Protocol Buffer and the Confluent Schema Registry. To consider * Mirror the implementation of the `debezium-avro-confluent` format. First implement a `protobuf-confluent` format similar to the existing [Confluent Avro|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/avro-confluent/] format that's provided today, which allows reading/writing protobuf using the Confluent Schema Registry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817073#comment-17817073 ] Piotr Nowicki commented on FLINK-34424: --- [~mapohl] I guess you wanted to mention [~pnowojski] ? :) > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817069#comment-17817069 ] Matthias Pohl commented on FLINK-34424: --- [~piotr.nowicki] (because it's networking; feel free to delegate) [~yunfengzhou] (because you touched the code in FLINK-33743 recently): Can someone help with investigating the cause of the issue? > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817063#comment-17817063 ] Matthias Pohl commented on FLINK-34424: --- I'm wondering whether that has anything to do with the blocked reader thread: {code} Feb 11 13:55:29 "Thread-76" #476 daemon prio=5 os_prio=0 tid=0x7f190bbf1800 nid=0x5a40 waiting for monitor entry [0x7f191bce4000] Feb 11 13:55:29java.lang.Thread.State: BLOCKED (on object monitor) Feb 11 13:55:29 at net.jpountz.lz4.LZ4JNI.LZ4_decompress_fast(Native Method) Feb 11 13:55:29 at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:70) Feb 11 13:55:29 at org.apache.flink.runtime.io.compression.Lz4BlockDecompressor.decompress(Lz4BlockDecompressor.java:68) Feb 11 13:55:29 at org.apache.flink.runtime.io.network.buffer.BufferDecompressor.decompress(BufferDecompressor.java:126) Feb 11 13:55:29 at org.apache.flink.runtime.io.network.buffer.BufferDecompressor.decompressToIntermediateBuffer(BufferDecompressor.java:68) Feb 11 13:55:29 at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.readLongs(BoundedBlockingSubpartitionWriteReadTest.java:206) Feb 11 13:55:29 at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.access$000(BoundedBlockingSubpartitionWriteReadTest.java:55) Feb 11 13:55:29 at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader.go(BoundedBlockingSubpartitionWriteReadTest.java:323) Feb 11 13:55:29 at org.apache.flink.core.testutils.CheckedThread.run(CheckedThread.java:67) {code} The test was started at 13:32:18.152 and timed out at 13:55:39 > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34333) Fix FLINK-34007 LeaderElector bug in 1.18
[ https://issues.apache.org/jira/browse/FLINK-34333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34333: -- Release Note: Fixes a bug where the leader election wasn't able to pick up leadership again after renewing the lease token caused a leadership loss. This required fabric8io:kubernetes-client to be upgraded from v6.6.2 to v6.9.0. > Fix FLINK-34007 LeaderElector bug in 1.18 > - > > Key: FLINK-34333 > URL: https://issues.apache.org/jira/browse/FLINK-34333 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.1 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Blocker > Labels: pull-request-available > Fix For: 1.18.2 > > > FLINK-34007 revealed a bug in the k8s client v6.6.2 which we're using since > Flink 1.18. This issue was fixed with FLINK-34007 for Flink 1.19 which > required an update of the k8s client to v6.9.0. > This Jira issue is about finding a solution in Flink 1.18 for the very same > problem FLINK-34007 covered. It's a dedicated Jira issue because we want to > unblock the release of 1.19 by resolving FLINK-34007. > Just to summarize why the upgrade to v6.9.0 is desired: There's a bug in > v6.6.2 which might prevent the leadership lost event being forwarded to the > client ([#5463|https://github.com/fabric8io/kubernetes-client/issues/5463]). > An initial proposal where the release call was handled in Flink's > {{KubernetesLeaderElector}} didn't work due to the leadership lost event > being triggered twice (see [FLINK-34007 PR > comment|https://github.com/apache/flink/pull/24132#discussion_r1467175902]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34333) Fix FLINK-34007 LeaderElector bug in 1.18
[ https://issues.apache.org/jira/browse/FLINK-34333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34333. --- Fix Version/s: 1.18.2 Resolution: Fixed * 1.18 ** [35c560312efc91dafd1b4674ce1e10acc9320ab1|https://github.com/apache/flink/commit/35c560312efc91dafd1b4674ce1e10acc9320ab1] ** [87560b7cedd6c857612a24b83485f5000b9edbd6|https://github.com/apache/flink/commit/87560b7cedd6c857612a24b83485f5000b9edbd6] > Fix FLINK-34007 LeaderElector bug in 1.18 > - > > Key: FLINK-34333 > URL: https://issues.apache.org/jira/browse/FLINK-34333 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.1 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Blocker > Labels: pull-request-available > Fix For: 1.18.2 > > > FLINK-34007 revealed a bug in the k8s client v6.6.2 which we're using since > Flink 1.18. This issue was fixed with FLINK-34007 for Flink 1.19 which > required an update of the k8s client to v6.9.0. > This Jira issue is about finding a solution in Flink 1.18 for the very same > problem FLINK-34007 covered. It's a dedicated Jira issue because we want to > unblock the release of 1.19 by resolving FLINK-34007. > Just to summarize why the upgrade to v6.9.0 is desired: There's a bug in > v6.6.2 which might prevent the leadership lost event being forwarded to the > client ([#5463|https://github.com/fabric8io/kubernetes-client/issues/5463]). > An initial proposal where the release call was handled in Flink's > {{KubernetesLeaderElector}} didn't work due to the leadership lost event > being triggered twice (see [FLINK-34007 PR > comment|https://github.com/apache/flink/pull/24132#discussion_r1467175902]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34333][k8s] 1.18 backport of FLINK-34007 [flink]
XComp merged PR #24245: URL: https://github.com/apache/flink/pull/24245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
StefanRRichter merged PR #24031: URL: https://github.com/apache/flink/pull/24031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-194188 There are still some issues while actually running this so I need to track down some problems and add some new tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
mxm commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1488088408 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -264,16 +299,14 @@ private void computeTargetDataRate( if (topology.isSource(vertex)) { double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); -if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) { +double lagRate = getRate(LAG, vertex, metricsHistory); +double sourceDataRate = Math.max(0, inputRate + lagRate); Review Comment: ```suggestion double ingestionDataRate = Math.max(0, inputRate + lagRate); ``` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -356,13 +421,13 @@ public static double getAverage( ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics(); double num = metrics.getOrDefault(metric, Double.NaN); -if (Double.isNaN(num)) { -continue; -} if (Double.isInfinite(num)) { anyInfinite = true; continue; } +if (Double.isNaN(num)) { +continue; +} Review Comment: I'm curious, why this change? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, +null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), -finished.contains(vertexId))); +maxParallelismMap.get(vertexId), Review Comment: This is passing in twice the same value for maxParallelism/originalMaxParallelism. Not sure that is intended. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), -/** - * Incoming data rate to the source, e.g. rate of records written to the Kafka topic - * (records/sec). - */ -SOURCE_DATA_RATE(true), Review Comment: For anyone wondering, this metric wasn't actually used other than for populating the source's `TARGET_DATA_RATE` which we can do directly instead of going through this intermediary. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -341,6 +375,37 @@ public static double getAverage( return getAverage(metric, jobVertexId, metricsHistory, 1); } +public static double getRate( +ScalingMetric metric, +@Nullable JobVertexID jobVertexId, +SortedMap metricsHistory) { + +Instant firstTs = null; +double first = Double.NaN; + +Instant lastTs = null; +double last = Double.NaN; + +for (var entry : metricsHistory.entrySet()) { +double value = entry.getValue().getVertexMetrics().get(jobVertexId).get(metric); +if (!Double.isNaN(value)) { +if (Double.isNaN(first)) { +first = value; +firstTs = entry.getKey(); +} else { +last = value; +lastTs = entry.getKey(); +} +} +} +if (Double.isNaN(last)) { +return Double.NaN; +} + +double diff = last - first; +return diff == 0 ? 0 : diff / Duration.between(firstTs, lastTs).toSeconds(); Review Comment: Not sure why we return zero when the diff is zero. The return value would then be zero anyway. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId,
Re: [PR] [FLINK-34437][SQL Client] Fix typo [flink]
flinkbot commented on PR #24310: URL: https://github.com/apache/flink/pull/24310#issuecomment-1941853000 ## CI report: * 2dbe5769fbe2b7dfec652d1360136901ae227b80 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34437) Typo in SQL Client - `s/succeed/succeeded`
[ https://issues.apache.org/jira/browse/FLINK-34437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34437: --- Labels: pull-request-available (was: ) > Typo in SQL Client - `s/succeed/succeeded` > -- > > Key: FLINK-34437 > URL: https://issues.apache.org/jira/browse/FLINK-34437 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.1 >Reporter: Robin Moffatt >Priority: Not a Priority > Labels: pull-request-available > > > {code:java} > Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); > [INFO] Execute statement succeed. {code} > `{*}Execute statement {color:#FF}succeed{color}.{*}` is grammatically > incorrect, and should read `{*}Execute statement > {color:#FF}succeeded{color}.{*}` > > [https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34437][SQL Client] Fix typo [flink]
rmoff opened a new pull request, #24310: URL: https://github.com/apache/flink/pull/24310 ## What is the purpose of the change The SQL Client issues a message when a statement successfully executes. For example: ```sql Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); [INFO] Execute statement succeed. ``` The use of `succeed` is incorrect, and should read `succeeded`. This PR fixes this. ## Brief change log * Changed the actual message * Changed the tests and docs that reference it. ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing -> This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34439) Move chown operations to COPY commands in Dockerfile
[ https://issues.apache.org/jira/browse/FLINK-34439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-34439: -- Assignee: Mate Czagany > Move chown operations to COPY commands in Dockerfile > > > Key: FLINK-34439 > URL: https://issues.apache.org/jira/browse/FLINK-34439 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Mate Czagany >Assignee: Mate Czagany >Priority: Minor > > We can lower the size of the output operator container image if we don't run > 'chown' commands in seperate RUN commands inside the Dockerfile, but instead > use the '--chown' argument of the COPY command. > Using 'RUN chown...' will copy all the files affected with their whole size > to a new layer, duplicating the previous files from the COPY command. > Example: > {code:java} > $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8 > ... > 3 months ago RUN /bin/sh -c chown -R flink:flink $FLINK... > 116MB buildkit.dockerfile.v0 > ... {code} > This would mean a 20% reduction in image size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34439) Move chown operations to COPY commands in Dockerfile
Mate Czagany created FLINK-34439: Summary: Move chown operations to COPY commands in Dockerfile Key: FLINK-34439 URL: https://issues.apache.org/jira/browse/FLINK-34439 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Mate Czagany We can lower the size of the output operator container image if we don't run 'chown' commands in seperate RUN commands inside the Dockerfile, but instead use the '--chown' argument of the COPY command. Using 'RUN chown...' will copy all the files affected with their whole size to a new layer, duplicating the previous files from the COPY command. Example: {code:java} $ docker image history ghcr.io/apache/flink-kubernetes-operator:ccb10b8 ... 3 months ago RUN /bin/sh -c chown -R flink:flink $FLINK... 116MB buildkit.dockerfile.v0 ... {code} This would mean a 20% reduction in image size. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode
[ https://issues.apache.org/jira/browse/FLINK-34438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-34438: -- Assignee: Mate Czagany > Kubernetes Operator doesn't wait for TaskManager deletion in native mode > > > Key: FLINK-34438 > URL: https://issues.apache.org/jira/browse/FLINK-34438 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.1, > kubernetes-operator-1.8.0 >Reporter: Mate Czagany >Assignee: Mate Czagany >Priority: Major > > This issue was partly fixed in FLINK-32334 but native mode was not included > in the fix. > I don't see any downsides with adding the same check to native deployment > mode, which would make sure that all TaskManagers were deleted when we shut > down a Flink cluster. > There should also be some logs suggesting that the timeout was exceeded > instead of silently returning when waiting for the cluster to shut down. > An issue was also mentioned on the mailing list which seems to be related to > this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
flinkbot commented on PR #24309: URL: https://github.com/apache/flink/pull/24309#issuecomment-1941800355 ## CI report: * 755cc0abbf3de6c5d6debc20e8a85e1f7fa220f8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34427) FineGrainedSlotManagerTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34427: --- Labels: pull-request-available test-stability (was: test-stability) > FineGrainedSlotManagerTest fails fatally (exit code 239) > > > Key: FLINK-34427 > URL: https://issues.apache.org/jira/browse/FLINK-34427 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: pull-request-available, test-stability > > https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 > {code} > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd > '/root/flink/flink-runtime' && > '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' > '-XX:+IgnoreUnrecognizedVMOptions' > '--add-opens=java.base/java.util=ALL-UNNAMED' > '--add-opens=java.base/java.lang=ALL-UNNAMED' > '--add-opens=java.base/java.net=ALL-UNNAMED' > '--add-opens=java.base/java.io=ALL-UNNAMED' > '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' > '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' > '/root/flink/flink-runtime/target/surefire' > '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' > 'surefire_26-20240212022332296_91tmp' > Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check > output in log > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.221 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.221 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) > [...] > {code} > The fatal error is triggered most likely within the > {{FineGrainedSlotManagerTest}}: > {code} > 02:26:39,362 [ pool-643-thread-1] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the > process... > java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: Task > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 > rejected from > java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool > size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) > ~[?:1.8.0_392] > at > org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$null$12(FineGrainedSlotManager.java:603) > ~[classes/:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_392] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_392] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_392] > at >
[PR] [FLINK-34427][runtime] Adds state check to requirementsCheck logic [flink]
XComp opened a new pull request, #24309: URL: https://github.com/apache/flink/pull/24309 ## What is the purpose of the change Quoting @zentol from [FLINK-34427](https://issues.apache.org/jira/browse/FLINK-34427?focusedCommentId=17816969=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17816969) here: > The problem is the use of scheduled executors in the FineGrainedSlotManager. It periodically tries to schedule actions unconditionally into the main thread, and this periodic action is also never cancelled. > If the rpc endpoint shuts down during the periodic delay the scheduled action can fire again before the rpc service (and thus scheduled executor) is shut down, running into this error. > This code is plain broken as tt makes assumptions about the lifecycle of the scheduled executor. The loop should be canceled when the FGSM is shut down, and as a safety rail any scheduled action should validate that the FGSM is not shut down yet before scheduling anything into the main thread. ## Brief change log * Makes `ManuallyTriggeredScheduledExecutorService` more robust against exceptions * Adds state check to scheduled task ## Verifying this change * `FineGrainedSlotManagerTest#testCloseWithScheduledTask` was added ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34438) Kubernetes Operator doesn't wait for TaskManager deletion in native mode
Mate Czagany created FLINK-34438: Summary: Kubernetes Operator doesn't wait for TaskManager deletion in native mode Key: FLINK-34438 URL: https://issues.apache.org/jira/browse/FLINK-34438 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.1, kubernetes-operator-1.7.0, kubernetes-operator-1.8.0 Reporter: Mate Czagany This issue was partly fixed in FLINK-32334 but native mode was not included in the fix. I don't see any downsides with adding the same check to native deployment mode, which would make sure that all TaskManagers were deleted when we shut down a Flink cluster. There should also be some logs suggesting that the timeout was exceeded instead of silently returning when waiting for the cluster to shut down. An issue was also mentioned on the mailing list which seems to be related to this: [https://lists.apache.org/thread/4gwj4ob4n9zg7b90vnqohj8x1p0bb5cb] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34437) Typo in SQL Client - `s/succeed/succeeded`
[ https://issues.apache.org/jira/browse/FLINK-34437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robin Moffatt updated FLINK-34437: -- Description: {code:java} Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); [INFO] Execute statement succeed. {code} `{*}Execute statement {color:#FF}succeed{color}.{*}` is grammatically incorrect, and should read `{*}Execute statement {color:#FF}succeeded{color}.{*}` [https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214] was: ```sql Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); [INFO] Execute statement succeed. ``` `Execute statement succeed.` is grammatically incorrect, and should read `Execute statement succeeded.` https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214 > Typo in SQL Client - `s/succeed/succeeded` > -- > > Key: FLINK-34437 > URL: https://issues.apache.org/jira/browse/FLINK-34437 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.1 >Reporter: Robin Moffatt >Priority: Not a Priority > > > {code:java} > Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); > [INFO] Execute statement succeed. {code} > `{*}Execute statement {color:#FF}succeed{color}.{*}` is grammatically > incorrect, and should read `{*}Execute statement > {color:#FF}succeeded{color}.{*}` > > [https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34437) Typo in SQL Client - `s/succeed/succeeded`
Robin Moffatt created FLINK-34437: - Summary: Typo in SQL Client - `s/succeed/succeeded` Key: FLINK-34437 URL: https://issues.apache.org/jira/browse/FLINK-34437 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.18.1 Reporter: Robin Moffatt ```sql Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); [INFO] Execute statement succeed. ``` `Execute statement succeed.` is grammatically incorrect, and should read `Execute statement succeeded.` https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacek Wislicki updated FLINK-34436: --- Summary: Avro schema evolution and compatibility issues in Pulsar connector (was: Avro schema evolution and compatibility issues) > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34436) Avro schema evolution and compatibility issues
Jacek Wislicki created FLINK-34436: -- Summary: Avro schema evolution and compatibility issues Key: FLINK-34436 URL: https://issues.apache.org/jira/browse/FLINK-34436 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.2 Reporter: Jacek Wislicki We noticed a couple of critical issues in the Pulsar-Flink connector related to schema evolution and compatibility. Please see the MRE available at https://github.com/JacekWislicki/test11. More details are in the project's README file, here is the summary: Library versions: * Pulsar 3.0.1 * Flink 1.17.2 * Pulsar-Flink connector 4.1.0-1.17 Problems: * Exception thrown when schema's fields are added/removed * Avro's enum default value is ignored, instead the last known applied I believe that I observed the same behaviour in the Pulsar itself, still now we are focusing on the connector, hence I was able to document the problems when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Bump org.yaml:snakeyaml from 1.31 to 2.0 in /flink-connector-kafka [flink-connector-kafka]
MartijnVisser commented on PR #85: URL: https://github.com/apache/flink-connector-kafka/pull/85#issuecomment-1941623489 Re-opening because https://bitbucket.org/snakeyaml/snakeyaml/wiki/Changes indicates that SnakeYAML should still run on Java8 (but requires Java 11 to build) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34435) Bump org.yaml:snakeyaml from 1.31 to 2.2 for flink-connector-elasticsearch
Martijn Visser created FLINK-34435: -- Summary: Bump org.yaml:snakeyaml from 1.31 to 2.2 for flink-connector-elasticsearch Key: FLINK-34435 URL: https://issues.apache.org/jira/browse/FLINK-34435 Project: Flink Issue Type: Technical Debt Components: Connectors / ElasticSearch Reporter: Martijn Visser Assignee: Martijn Visser https://github.com/apache/flink-connector-elasticsearch/pull/90 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Bump org.yaml:snakeyaml from 1.31 to 2.0 [flink-connector-elasticsearch]
MartijnVisser commented on PR #90: URL: https://github.com/apache/flink-connector-elasticsearch/pull/90#issuecomment-1941612550 > where did you get this info? Hmmm I would have sworn that was the case, but apparently I was wrong because https://bitbucket.org/snakeyaml/snakeyaml/wiki/Changes lists as requiring Java 7 to run (and 11 to build) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: flink account cannot get resource "services" in API group [flink-kubernetes-operator]
mxm commented on PR #596: URL: https://github.com/apache/flink-kubernetes-operator/pull/596#issuecomment-1941597015 Great! The request is approved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3
[ https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817008#comment-17817008 ] Nathan Taylor Armstrong Lewis edited comment on FLINK-33817 at 2/13/24 1:56 PM: I can confirm that this issue affects Flink version 1.17.x as well. was (Author: JIRAUSER304121): I can confirm that this issue affects version 1.17.x as well. > Allow ReadDefaultValues = False for non primitive types on Proto3 > - > > Key: FLINK-33817 > URL: https://issues.apache.org/jira/browse/FLINK-33817 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.18.0 >Reporter: Sai Sharath Dandi >Priority: Major > Labels: pull-request-available > > *Background* > > The current Protobuf format > [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java] > always sets ReadDefaultValues=False when using Proto3 version. This can > cause severe performance degradation for large Protobuf schemas with OneOf > fields as the entire generated code needs to be executed during > deserialization even when certain fields are not present in the data to be > deserialized and all the subsequent nested Fields can be skipped. Proto3 > supports hasXXX() methods for checking field presence for non primitive types > since Proto version > [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In > the internal performance benchmarks in our company, we've seen almost 10x > difference in performance for one of our real production usecase when > allowing to set ReadDefaultValues=False with proto3 version. The exact > difference in performance depends on the schema complexity and data payload > but we should allow user to set readDefaultValue=False in general. > > *Solution* > > Support using ReadDefaultValues=False when using Proto3 version. We need to > be careful to check for field presence only on non-primitive types if > ReadDefaultValues is false and version used is Proto3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1487904426 ## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { +private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + +public static void main(String[] args) throws Exception { +StreamExecutionEnvironment env; + +// Conditionally return a local execution environment with +if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { +Configuration conf = new Configuration(); +conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), +true); +env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); +} else { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +} + +env.setParallelism(2); + +ParameterTool applicationParameters = ParameterTool.fromArgs(args); + +// Prometheus remote-write URL +String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); +LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + +// Optionally configure Amazon Managed Prometheus request signer +PrometheusRequestSigner requestSigner = null; +String ampAWSRegion = applicationParameters.get("awsRegion"); +if (ampAWSRegion != null) { +requestSigner = +new AmazonManagedPrometheusWriteRequestSigner( +prometheusRemoteWriteUrl, ampAWSRegion); +LOGGER.info( +"Enable Amazon Managed Prometheus request-signer, region: {}", ampAWSRegion); +} + +// Configure data generator +int generatorMinSamplesPerTimeSeries = 1; +int generatorMaxSamplesPerTimeSeries = 10; +int generatorNumberOfDummyInstances = 5; +long generatorPauseBetweenTimeSeriesMs = 100; +LOGGER.info( +"Data Generator configuration:" ++ "\n\t\tMin samples per time series:{}\n\t\tMax samples per time series:{}\n\t\tPause between time series:{} ms" ++ "\n\t\tNumber of dummy instances:{}", +generatorMinSamplesPerTimeSeries, +generatorMaxSamplesPerTimeSeries, +generatorPauseBetweenTimeSeriesMs, +generatorNumberOfDummyInstances); + +Supplier eventGenerator = +new
[jira] [Commented] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3
[ https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817008#comment-17817008 ] Nathan Taylor Armstrong Lewis commented on FLINK-33817: --- I can confirm that this issue affects version 1.17.x as well. > Allow ReadDefaultValues = False for non primitive types on Proto3 > - > > Key: FLINK-33817 > URL: https://issues.apache.org/jira/browse/FLINK-33817 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.18.0 >Reporter: Sai Sharath Dandi >Priority: Major > Labels: pull-request-available > > *Background* > > The current Protobuf format > [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java] > always sets ReadDefaultValues=False when using Proto3 version. This can > cause severe performance degradation for large Protobuf schemas with OneOf > fields as the entire generated code needs to be executed during > deserialization even when certain fields are not present in the data to be > deserialized and all the subsequent nested Fields can be skipped. Proto3 > supports hasXXX() methods for checking field presence for non primitive types > since Proto version > [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In > the internal performance benchmarks in our company, we've seen almost 10x > difference in performance for one of our real production usecase when > allowing to set ReadDefaultValues=False with proto3 version. The exact > difference in performance depends on the schema complexity and data payload > but we should allow user to set readDefaultValue=False in general. > > *Solution* > > Support using ReadDefaultValues=False when using Proto3 version. We need to > be careful to check for field presence only on non-primitive types if > ReadDefaultValues is false and version used is Proto3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Chore: Change metric types, "numSplitsProcessed" in metrics.md [flink]
flinkbot commented on PR #24308: URL: https://github.com/apache/flink/pull/24308#issuecomment-1941566002 ## CI report: * a1d3ee92d6f82db8f91a767b7c262bed553988e2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Chore: Change metric types, "numSplitsProcessed" in metrics.md [flink]
WarbriT opened a new pull request, #24308: URL: https://github.com/apache/flink/pull/24308 ## What is the purpose of the change Typo in metics.md. Modify "numSplitsProcessed" metric type from Gauge to Counter in both english and chinese version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1487883477 ## example-datastream-job/src/main/java/org/apache/flink/connector/prometheus/examples/DataStreamJob.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.prometheus.sink.PrometheusRequestSigner; +import org.apache.flink.connector.prometheus.sink.PrometheusSink; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries; +import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeriesLabelsAndMetricNameKeySelector; +import org.apache.flink.connector.prometheus.sink.aws.AmazonManagedPrometheusWriteRequestSigner; +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RetryConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.function.Supplier; + +/** Test application testing the Prometheus sink connector. */ +public class DataStreamJob { +private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamJob.class); + +public static void main(String[] args) throws Exception { +StreamExecutionEnvironment env; + +// Conditionally return a local execution environment with +if (args.length > 0 && Arrays.stream(args).anyMatch("--webUI"::equalsIgnoreCase)) { +Configuration conf = new Configuration(); +conf.set( + ConfigOptions.key("rest.flamegraph.enabled").booleanType().noDefaultValue(), +true); +env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); +} else { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +} + +env.setParallelism(2); + +ParameterTool applicationParameters = ParameterTool.fromArgs(args); + +// Prometheus remote-write URL +String prometheusRemoteWriteUrl = applicationParameters.get("prometheusRemoteWriteUrl"); +LOGGER.info("Prometheus URL:{}", prometheusRemoteWriteUrl); + +// Optionally configure Amazon Managed Prometheus request signer +PrometheusRequestSigner requestSigner = null; +String ampAWSRegion = applicationParameters.get("awsRegion"); +if (ampAWSRegion != null) { +requestSigner = +new AmazonManagedPrometheusWriteRequestSigner( +prometheusRemoteWriteUrl, ampAWSRegion); +LOGGER.info( +"Enable Amazon Managed Prometheus request-signer, region: {}", ampAWSRegion); +} + +// Configure data generator +int generatorMinSamplesPerTimeSeries = 1; +int generatorMaxSamplesPerTimeSeries = 10; +int generatorNumberOfDummyInstances = 5; Review Comment: Reworked the random data generator to generate a configurable random number of metrics (`M`, `M1`...) for a configurable random number of sources (`SourceID` = `S00`, `S01`...). Removed confusing references to "instances", "CPU" and "Memory" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] fix: flink account cannot get resource "services" in API group [flink-kubernetes-operator]
prakash-42 commented on PR #596: URL: https://github.com/apache/flink-kubernetes-operator/pull/596#issuecomment-1941522672 Hi @mxm ! I have submitted the request for account with username `prakash.tiwari` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34386][state] Add RocksDB bloom filter metrics [flink]
hejufang commented on PR #24274: URL: https://github.com/apache/flink/pull/24274#issuecomment-1941505603 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-29122][core] Improve robustness of FileUtils.expandDirectory() [flink]
flinkbot commented on PR #24307: URL: https://github.com/apache/flink/pull/24307#issuecomment-1941503032 ## CI report: * 0ab33ce4a51fbfefa0280f1123dee49d37275dcb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29122) Improve robustness of FileUtils.expandDirectory()
[ https://issues.apache.org/jira/browse/FLINK-29122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29122: --- Labels: pull-request-available (was: ) > Improve robustness of FileUtils.expandDirectory() > -- > > Key: FLINK-29122 > URL: https://issues.apache.org/jira/browse/FLINK-29122 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.16.0, 1.17.0 >Reporter: Robert Metzger >Assignee: Anupam Aggarwal >Priority: Major > Labels: pull-request-available > > `FileUtils.expandDirectory()` can potentially write to invalid locations if > the zip file is invalid (contains entry names with ../). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-29122][core] Improve robustness of FileUtils.expandDirectory() [flink]
anupamaggarwal opened a new pull request, #24307: URL: https://github.com/apache/flink/pull/24307 ## What is the purpose of the change This pull request adds additional checks to improve robustness of FileUtils.expandDirectory() method. ## Brief change log - Adds additional checks in expandDirectory() method to ensure any paths don't refer to locations outside target directory. Check is recursive and ensures the expanded path is fully contained inside the target directory. ## Verifying this change This change added tests and can be verified as follows: - Added unit test to account for relative paths within target directory (these accesses should be allowed) - Added unit test for access outside target directory (these should fail with I/O Exception) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1487826357 ## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java: ## @@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT()) .expectArgumentTypes( DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(), DataTypes.INT()), + TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.expectSignature("f(>)") + .calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW())) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), +TestSpec.forStrategy( +"Strategy fails if input argument type is not ARRAY", + sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.calledWithArgumentTypes(DataTypes.INT()) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), +TestSpec.forStrategy( +"Strategy fails if the number of input arguments are not one", + sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)) +.calledWithArgumentTypes( +DataTypes.ARRAY(DataTypes.INT()), +DataTypes.ARRAY(DataTypes.STRING())) +.expectErrorMessage( +"Invalid input arguments. Expected signatures are:\n" ++ "f(>)"), Review Comment: Sorry, I was not clear enough. I meant we don't need to test `sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)` takes a single argument. We don't need to test that, because that's a property of the `sequence`, whatever we use here instead of `SpecificInputTypeStrategies.ARRAY_COMPARABLE` does not really matter. The other two cases make total sense and you do it the right way! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1487823032 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +231,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(ARRAY_COMPARABLE), +sequence( +ARRAY_COMPARABLE, +InputTypeStrategies.explicit( Review Comment: You're right. Sorry, missed that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: > Ok, thank you, actually, I did this function last summer and follow the jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description. I understand that. If at some point we find out we either made mistake or did not put enough effort into something it's better to fix that sooner rather than later when we need to leave with the consequences. I admit I have not thoroughly checked the semantics before which I should've. It's better to do something well rather than fast in my opinion. I see, so traditional RDBMS do not really support that function. It's also worth checking what does: * Snowflake: https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` when any argument is `null`), null handling separately * Spark: https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: from the docs it does not say what's the behaviour on `null` `ascendingOrder`, nulls first on asc, nulls last on desc * Presto: https://prestodb.io/docs/current/functions/array.html: has two separate functions for `ASC/DESC` To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
dawidwys commented on code in PR #22951: URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_SORT = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_SORT") +.kind(SCALAR) +.inputTypeStrategy( +or( +sequence(new ArrayComparableElementArgumentTypeStrategy()), +sequence( +new ArrayComparableElementArgumentTypeStrategy(), +logical(LogicalTypeRoot.BOOLEAN Review Comment: > Ok, thank you, actually, I did this function last summer and follow the jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description. I understand that. If at some point we find out we either made mistake or did not put enough effort into something it's better to fix that sooner rather than later when we need to leave with the consequences. I admit I have not thoroughly checked the semantics before which I should've. It's better to do something well rather than fast in my opinion. I see, so traditional RDBMS do not really support that function. It's also worth checking what does: * Snowflake: https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` on any argument `null`), null handling separately * Spark: https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: from the docs it does not say what's the behaviour on `null` `ascendingOrder` * Presto: https://prestodb.io/docs/current/functions/array.html: has two separate functions for `ASC/DESC` To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged
[ https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34266: --- Labels: pull-request-available (was: ) > Output ratios should be computed over the whole metric window instead of > averaged > - > > Key: FLINK-34266 > URL: https://issues.apache.org/jira/browse/FLINK-34266 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Critical > Labels: pull-request-available > > Currently Output ratios are computed during metric collection based on the > current in/out metrics an stored as part of the collected metrics. > During evaluation the output ratios previously computed are then averaged > together in the metric window. This however leads to incorrect computation > due to the nature of the computation and averaging. > Example: > Let's look at a window operator that simply sorts and re-emits events in > windows. During the window collection phase, output ratio will be computed > and stored as 0. During the window computation the output ratio will be > last_input_rate / window_size. Depending on the last input rate observation > this can be off when averaged into any direction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1941448169 cc @mateczagany -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora opened a new pull request, #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774 ## What is the purpose of the change This PR aims to cover both [FLINK-34213](https://issues.apache.org/jira/browse/FLINK-34213) and [FLINK-34266](https://issues.apache.org/jira/browse/FLINK-34266). Currently all metric tracking for input / output data rates, busyTime etc happens based on perSecond metrics in Flink. Depending on the frequency of the autoscaler metric collection and the exact processing pattern of the application perSecond metrics can result in completely erroneous autoscaler metric computations. The most important example of this would be handling large windowed computations or other burst loads where we have spikes in data rates after periods of inactivity. These use-cases currently completely break down with the autoscaler as the jobs are generally scaled too low because incoming data is not measured correctly. To solve this we move from perSecond in/out metrics to the accumulated counts which allows us to measure correct input out rates and output ratios over the entire metric window. To do this we have to revise the metric collection logic as some information is not exposed as metric but exposed directly through the job details query we already do to track topology changes. Summary of changes: - Collect accumulated input/output record count + accumulated busy time from JobDetailsInfo rest request instead of metrics - Remove TrueProcessRate and OutputRatios from the collected metrics and move the computation to the evaluation phase (this reduces the amount of stored metrics in the store as well) - Require at least 2 observations for metric evaluation as needed for rate computation from cumulative metrics - Improve and simplify JobTopology structure to allow incorporating io metrics - As TPR and some other metrics are now only evaluated for the entire metric window. Current values will no longer be reported during evaluation (this is relevant for reported metrics) The above logic changes required a redesign of many of the tests especially the ones with somewhat complex logic. Test changes: - Move many tests from collection to evaluation phase and simplify as much as possible into smaller unit tests - Introduce helper classes to generate metrics for complex integration tests like (MetricsCollectionAndEvaluationTest / BacklogBasedSacalingTest, etc.) - Remove some duplicated or non-functional tests ## Verifying this change A lot of tests and unit tests have changed, I tried to always preserve or extend the coverage. [TODO] : Extensive manual testing still in-progress ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34432] Re-enable forkReuse for flink-table-planner [flink]
MartijnVisser commented on PR #24305: URL: https://github.com/apache/flink/pull/24305#issuecomment-1941430726 Personal CI Run 3: https://dev.azure.com/martijn0323/Flink/_build/results?buildId=4198=results ⌛ CI Run 3: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57507=results ⌛ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond
[ https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-34213: -- Assignee: Gyula Fora > Consider using accumulated busy time instead of busyMsPerSecond > --- > > Key: FLINK-34213 > URL: https://issues.apache.org/jira/browse/FLINK-34213 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Gyula Fora >Priority: Minor > > We might achieve much better accuracy if we used the accumulated busy time > metrics from Flink, instead of the momentarily collected ones. > We would use the diff between the last accumulated and the current > accumulated busy time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] fix: flink account cannot get resource "services" in API group [flink-kubernetes-operator]
mxm commented on PR #596: URL: https://github.com/apache/flink-kubernetes-operator/pull/596#issuecomment-1941400242 @prakash-42 You can request a JIRA account here: https://selfserve.apache.org/jira-account.html Let me know the username once you have requested an account, I can then approve your request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816981#comment-17816981 ] Etienne Chauchot edited comment on FLINK-34364 at 2/13/24 12:21 PM: Notably the source release script was excluding tools/releasing/shared but not tools/release/shared. This is why tools/release/shared was in the source release. And by the way I noticed that all the connectors source releases were containing an empty tools/releasing directory because only tools/releasing/shared is excluded in the source release script and not the whole tools/releasing directory. It seems a bit messy to me so I think we should fix that in the release scripts later on for next connectors releases. was (Author: echauchot): Notably the source release script was excluding tools/releasing/shared but not tools/release/shared. This is why tools/release/shared was in the source release. And by the way I noticed that all the connectors source releases were containing an empty tools/releasing directory because only tools/releasing/shared is excluded in the source release script. It seems a bit messy to me so I think we should fix that in the release scripts later on for next connectors releases. > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816981#comment-17816981 ] Etienne Chauchot commented on FLINK-34364: -- Notably the source release script was excluding tools/releasing/shared but not tools/release/shared. This is why tools/release/shared was in the source release. And by the way I noticed that all the connectors source releases were containing an empty tools/releasing directory because only tools/releasing/shared is excluded in the source release script. It seems a bit messy to me so I think we should fix that in the release scripts later on for next connectors releases. > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34364] Replace incorrect release_utils mount point tools/release/shared by tools/releasing/shared to match the release doc and scripts [flink-connector-shared-utils]
echauchot opened a new pull request, #36: URL: https://github.com/apache/flink-connector-shared-utils/pull/36 I think I can self merge on that one. FYI: @zentol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34364: --- Labels: pull-request-available (was: ) > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Etienne Chauchot updated FLINK-34364: - Description: parent_pom branch refers to an incorrect mount point tools/*release*/shared instead of tools/*releasing*/shared for the release_utils. _tools/releasing_/shared is the one used in the release scripts and in the release docs (was: parent_pom branch refers to an incorrect mount point tools/*release*/shared instead of tools/*releasing*/shared. _tools/releasing_/shared is the one used in the release scripts and in the release docs) > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared for the release_utils. > _tools/releasing_/shared is the one used in the release scripts and in the > release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Etienne Chauchot updated FLINK-34364: - Description: parent_pom branch refers to an incorrect mount point tools/*release*/shared instead of tools/*releasing*/shared. _tools/releasing_/shared is the one used in the release scripts and in the release docs (was: This directory is the mount point of the release utils repository and should be excluded from the source release.) > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > > parent_pom branch refers to an incorrect mount point tools/*release*/shared > instead of tools/*releasing*/shared. _tools/releasing_/shared is the one used > in the release scripts and in the release docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364 ] Etienne Chauchot deleted comment on FLINK-34364: -- was (Author: echauchot): parent_pom branch refers to an incorrect mount point tools/*release*/shared instead of tools/*releasing*/shared. _tools/releasing_/shared is the one used in the release scripts and in the release docs > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > > This directory is the mount point of the release utils repository and should > be excluded from the source release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34364) Fix release utils mount point to match the release doc and scripts
[ https://issues.apache.org/jira/browse/FLINK-34364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816560#comment-17816560 ] Etienne Chauchot edited comment on FLINK-34364 at 2/13/24 12:11 PM: parent_pom branch refers to an incorrect mount point tools/*release*/shared instead of tools/*releasing*/shared. _tools/releasing_/shared is the one used in the release scripts and in the release docs was (Author: echauchot): This was an incorrect mount point tools/*release* instead of tools/*releasing*. _tools/releasing_ is already excluded in the source release script And _tools/releasing_ is the path referred to in the docs > Fix release utils mount point to match the release doc and scripts > -- > > Key: FLINK-34364 > URL: https://issues.apache.org/jira/browse/FLINK-34364 > Project: Flink > Issue Type: Bug > Components: Connectors / Parent, Release System >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > > This directory is the mount point of the release utils repository and should > be excluded from the source release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34425) TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure times out
[ https://issues.apache.org/jira/browse/FLINK-34425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-34425: - Assignee: (was: Matthias Pohl) > TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure > times out > --- > > Key: FLINK-34425 > URL: https://issues.apache.org/jira/browse/FLINK-34425 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7851900616/job/21429757962#step:10:8844 > {code} > Feb 10 03:21:45 "main" #1 [498632] prio=5 os_prio=0 cpu=619.91ms > elapsed=1653.40s tid=0x7fbd29695000 nid=498632 waiting on condition > [0x7fbd2b9f3000] > Feb 10 03:21:45java.lang.Thread.State: WAITING (parking) > Feb 10 03:21:45 at > jdk.internal.misc.Unsafe.park(java.base@21.0.1/Native Method) > Feb 10 03:21:45 - parking to wait for <0xae6199f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Feb 10 03:21:45 at > java.util.concurrent.locks.LockSupport.park(java.base@21.0.1/LockSupport.java:371) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.1/AbstractQueuedSynchronizer.java:519) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.1/ForkJoinPool.java:3780) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.1/ForkJoinPool.java:3725) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.1/AbstractQueuedSynchronizer.java:1707) > Feb 10 03:21:45 at > java.lang.ProcessImpl.waitFor(java.base@21.0.1/ProcessImpl.java:425) > Feb 10 03:21:45 at > org.apache.flink.test.recovery.TaskManagerRunnerITCase.testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure(TaskManagerRunnerITCase.java:126) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$DMH/0x7fbccb1b8000.invokeVirtual(java.base@21.0.1/LambdaForm$DMH) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$MH/0x7fbccb1b8800.invoke(java.base@21.0.1/LambdaForm$MH) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34427) FineGrainedSlotManagerTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816973#comment-17816973 ] Matthias Pohl commented on FLINK-34427: --- Thanks for the clarification. This is an issue that also exists in 1.18. I won't increase the priority to blocker for 1.19 because of that. But we should fix this. > FineGrainedSlotManagerTest fails fatally (exit code 239) > > > Key: FLINK-34427 > URL: https://issues.apache.org/jira/browse/FLINK-34427 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 > {code} > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd > '/root/flink/flink-runtime' && > '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' > '-XX:+IgnoreUnrecognizedVMOptions' > '--add-opens=java.base/java.util=ALL-UNNAMED' > '--add-opens=java.base/java.lang=ALL-UNNAMED' > '--add-opens=java.base/java.net=ALL-UNNAMED' > '--add-opens=java.base/java.io=ALL-UNNAMED' > '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' > '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' > '/root/flink/flink-runtime/target/surefire' > '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' > 'surefire_26-20240212022332296_91tmp' > Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check > output in log > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.221 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.221 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) > [...] > {code} > The fatal error is triggered most likely within the > {{FineGrainedSlotManagerTest}}: > {code} > 02:26:39,362 [ pool-643-thread-1] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the > process... > java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: Task > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 > rejected from > java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool > size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) > ~[?:1.8.0_392] > at > org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$null$12(FineGrainedSlotManager.java:603) > ~[classes/:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_392] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_392] > at >
[jira] [Updated] (FLINK-34427) FineGrainedSlotManagerTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34427: -- Affects Version/s: 1.18.1 > FineGrainedSlotManagerTest fails fatally (exit code 239) > > > Key: FLINK-34427 > URL: https://issues.apache.org/jira/browse/FLINK-34427 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 > {code} > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd > '/root/flink/flink-runtime' && > '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' > '-XX:+IgnoreUnrecognizedVMOptions' > '--add-opens=java.base/java.util=ALL-UNNAMED' > '--add-opens=java.base/java.lang=ALL-UNNAMED' > '--add-opens=java.base/java.net=ALL-UNNAMED' > '--add-opens=java.base/java.io=ALL-UNNAMED' > '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' > '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' > '/root/flink/flink-runtime/target/surefire' > '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' > 'surefire_26-20240212022332296_91tmp' > Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check > output in log > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.221 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.221 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) > [...] > {code} > The fatal error is triggered most likely within the > {{FineGrainedSlotManagerTest}}: > {code} > 02:26:39,362 [ pool-643-thread-1] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the > process... > java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: Task > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 > rejected from > java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool > size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) > ~[?:1.8.0_392] > at > org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$null$12(FineGrainedSlotManager.java:603) > ~[classes/:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_392] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_392] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_392] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >
[jira] [Updated] (FLINK-34425) TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure times out
[ https://issues.apache.org/jira/browse/FLINK-34425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34425: -- Priority: Major (was: Critical) > TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure > times out > --- > > Key: FLINK-34425 > URL: https://issues.apache.org/jira/browse/FLINK-34425 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7851900616/job/21429757962#step:10:8844 > {code} > Feb 10 03:21:45 "main" #1 [498632] prio=5 os_prio=0 cpu=619.91ms > elapsed=1653.40s tid=0x7fbd29695000 nid=498632 waiting on condition > [0x7fbd2b9f3000] > Feb 10 03:21:45java.lang.Thread.State: WAITING (parking) > Feb 10 03:21:45 at > jdk.internal.misc.Unsafe.park(java.base@21.0.1/Native Method) > Feb 10 03:21:45 - parking to wait for <0xae6199f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Feb 10 03:21:45 at > java.util.concurrent.locks.LockSupport.park(java.base@21.0.1/LockSupport.java:371) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.1/AbstractQueuedSynchronizer.java:519) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.1/ForkJoinPool.java:3780) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.1/ForkJoinPool.java:3725) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.1/AbstractQueuedSynchronizer.java:1707) > Feb 10 03:21:45 at > java.lang.ProcessImpl.waitFor(java.base@21.0.1/ProcessImpl.java:425) > Feb 10 03:21:45 at > org.apache.flink.test.recovery.TaskManagerRunnerITCase.testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure(TaskManagerRunnerITCase.java:126) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$DMH/0x7fbccb1b8000.invokeVirtual(java.base@21.0.1/LambdaForm$DMH) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$MH/0x7fbccb1b8800.invoke(java.base@21.0.1/LambdaForm$MH) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34425) TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure times out
[ https://issues.apache.org/jira/browse/FLINK-34425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816971#comment-17816971 ] Matthias Pohl edited comment on FLINK-34425 at 2/13/24 11:48 AM: - This looks like a test issue. The TaskManager process is destroyed in [TaskManagerRunnerITCase:124|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L124] but doesn't get back properly causing the timeout in {{#waitFor()}} in [TaskManagerRunnerITCase:126|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L126]. I'm gonna lower this issue's priority to {{Major}}. I don't consider it in any way problematic for the upcoming 1.19 release. was (Author: mapohl): This looks like a test issue. The TaskManager process is destroyed in [TaskManagerRunnerITCase:124|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L124] but doesn't get back properly causing the timeout in {{#waitFor()}} in [TaskManagerRunnerITCase:126|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L126]. > TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure > times out > --- > > Key: FLINK-34425 > URL: https://issues.apache.org/jira/browse/FLINK-34425 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7851900616/job/21429757962#step:10:8844 > {code} > Feb 10 03:21:45 "main" #1 [498632] prio=5 os_prio=0 cpu=619.91ms > elapsed=1653.40s tid=0x7fbd29695000 nid=498632 waiting on condition > [0x7fbd2b9f3000] > Feb 10 03:21:45java.lang.Thread.State: WAITING (parking) > Feb 10 03:21:45 at > jdk.internal.misc.Unsafe.park(java.base@21.0.1/Native Method) > Feb 10 03:21:45 - parking to wait for <0xae6199f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Feb 10 03:21:45 at > java.util.concurrent.locks.LockSupport.park(java.base@21.0.1/LockSupport.java:371) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.1/AbstractQueuedSynchronizer.java:519) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.1/ForkJoinPool.java:3780) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.1/ForkJoinPool.java:3725) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.1/AbstractQueuedSynchronizer.java:1707) > Feb 10 03:21:45 at > java.lang.ProcessImpl.waitFor(java.base@21.0.1/ProcessImpl.java:425) > Feb 10 03:21:45 at > org.apache.flink.test.recovery.TaskManagerRunnerITCase.testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure(TaskManagerRunnerITCase.java:126) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$DMH/0x7fbccb1b8000.invokeVirtual(java.base@21.0.1/LambdaForm$DMH) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$MH/0x7fbccb1b8800.invoke(java.base@21.0.1/LambdaForm$MH) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34425) TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure times out
[ https://issues.apache.org/jira/browse/FLINK-34425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816971#comment-17816971 ] Matthias Pohl commented on FLINK-34425: --- This looks like a test issue. The TaskManager process is destroyed in [TaskManagerRunnerITCase:124|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L124] but doesn't get back properly causing the timeout in {{#waitFor()}} in [TaskManagerRunnerITCase:126|https://github.com/apache/flink/blob/d6c7eee8243b4fe3e593698f250643534dc79cb5/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerRunnerITCase.java#L126]. > TaskManagerRunnerITCase#testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure > times out > --- > > Key: FLINK-34425 > URL: https://issues.apache.org/jira/browse/FLINK-34425 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7851900616/job/21429757962#step:10:8844 > {code} > Feb 10 03:21:45 "main" #1 [498632] prio=5 os_prio=0 cpu=619.91ms > elapsed=1653.40s tid=0x7fbd29695000 nid=498632 waiting on condition > [0x7fbd2b9f3000] > Feb 10 03:21:45java.lang.Thread.State: WAITING (parking) > Feb 10 03:21:45 at > jdk.internal.misc.Unsafe.park(java.base@21.0.1/Native Method) > Feb 10 03:21:45 - parking to wait for <0xae6199f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > Feb 10 03:21:45 at > java.util.concurrent.locks.LockSupport.park(java.base@21.0.1/LockSupport.java:371) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@21.0.1/AbstractQueuedSynchronizer.java:519) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@21.0.1/ForkJoinPool.java:3780) > Feb 10 03:21:45 at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@21.0.1/ForkJoinPool.java:3725) > Feb 10 03:21:45 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@21.0.1/AbstractQueuedSynchronizer.java:1707) > Feb 10 03:21:45 at > java.lang.ProcessImpl.waitFor(java.base@21.0.1/ProcessImpl.java:425) > Feb 10 03:21:45 at > org.apache.flink.test.recovery.TaskManagerRunnerITCase.testNondeterministicWorkingDirIsDeletedInCaseOfProcessFailure(TaskManagerRunnerITCase.java:126) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$DMH/0x7fbccb1b8000.invokeVirtual(java.base@21.0.1/LambdaForm$DMH) > Feb 10 03:21:45 at > java.lang.invoke.LambdaForm$MH/0x7fbccb1b8800.invoke(java.base@21.0.1/LambdaForm$MH) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34427) FineGrainedSlotManagerTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816969#comment-17816969 ] Chesnay Schepler commented on FLINK-34427: -- The problem is the use of scheduled executors in the FineGrainedSlotManager. It periodically tries to schedule actions unconditionally into the main thread, and this periodic action is also never cancelled. If the rpc endpoint shuts down during the periodic delay the scheduled action can fire again before the rpc service (and thus scheduled executor) is shut down, running into this error. This code is plain broken as tt makes assumptions about the lifecycle of the scheduled executor. The loop should be canceled when the FGSM is shut down, and as a safety rail any scheduled action should validate that the FGSM is not shut down yet before scheduling anything into the main thread. > FineGrainedSlotManagerTest fails fatally (exit code 239) > > > Key: FLINK-34427 > URL: https://issues.apache.org/jira/browse/FLINK-34427 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 > {code} > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd > '/root/flink/flink-runtime' && > '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' > '-XX:+IgnoreUnrecognizedVMOptions' > '--add-opens=java.base/java.util=ALL-UNNAMED' > '--add-opens=java.base/java.lang=ALL-UNNAMED' > '--add-opens=java.base/java.net=ALL-UNNAMED' > '--add-opens=java.base/java.io=ALL-UNNAMED' > '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' > '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' > '/root/flink/flink-runtime/target/surefire' > '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' > 'surefire_26-20240212022332296_91tmp' > Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check > output in log > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.221 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.221 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) > [...] > {code} > The fatal error is triggered most likely within the > {{FineGrainedSlotManagerTest}}: > {code} > 02:26:39,362 [ pool-643-thread-1] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the > process... > java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: Task > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 > rejected from > java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool > size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) > ~[?:1.8.0_392] > at > org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) > ~[classes/:?] > at >
[jira] [Updated] (FLINK-34000) Implement restore tests for IncrementalGroupAggregate node
[ https://issues.apache.org/jira/browse/FLINK-34000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-34000: - Fix Version/s: 1.20.0 (was: 1.19.0) > Implement restore tests for IncrementalGroupAggregate node > -- > > Key: FLINK-34000 > URL: https://issues.apache.org/jira/browse/FLINK-34000 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34000) Implement restore tests for IncrementalGroupAggregate node
[ https://issues.apache.org/jira/browse/FLINK-34000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-34000. Resolution: Implemented > Implement restore tests for IncrementalGroupAggregate node > -- > > Key: FLINK-34000 > URL: https://issues.apache.org/jira/browse/FLINK-34000 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34000) Implement restore tests for IncrementalGroupAggregate node
[ https://issues.apache.org/jira/browse/FLINK-34000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816966#comment-17816966 ] Dawid Wysakowicz commented on FLINK-34000: -- Merged an improved version in 14d5dbc4c53b2e200dc57e3f4c053583f2419b14..5844092408d21023a738077d0922cc75f1e634d7 > Implement restore tests for IncrementalGroupAggregate node > -- > > Key: FLINK-34000 > URL: https://issues.apache.org/jira/browse/FLINK-34000 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34000] Implement restore tests for IncrementalGroupAgg node [flink]
dawidwys merged PR #24154: URL: https://github.com/apache/flink/pull/24154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34427) FineGrainedSlotManagerTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816963#comment-17816963 ] Matthias Pohl commented on FLINK-34427: --- [~chesnay] the upstream future {{requestFuture}} is coming from the {{TaskManagerGateway#requestSlot}} RPC call. I would conclude that the RPCEndpoint (considering that the [handleAsync callback|https://github.com/apache/flink/blob/15fe1653acec45d7c7bac17071e9773a4aa690a4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L138] should be chained up and run in wherever the RPC call is executed) is should down while there's still a scheduled task queued up causing the {{RejectedExecutionException}}. WDYT? > FineGrainedSlotManagerTest fails fatally (exit code 239) > > > Key: FLINK-34427 > URL: https://issues.apache.org/jira/browse/FLINK-34427 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 > {code} > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.220 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd > '/root/flink/flink-runtime' && > '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' > '-XX:+IgnoreUnrecognizedVMOptions' > '--add-opens=java.base/java.util=ALL-UNNAMED' > '--add-opens=java.base/java.lang=ALL-UNNAMED' > '--add-opens=java.base/java.net=ALL-UNNAMED' > '--add-opens=java.base/java.io=ALL-UNNAMED' > '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' > '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' > '/root/flink/flink-runtime/target/surefire' > '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' > 'surefire_26-20240212022332296_91tmp' > Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check > output in log > Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 > Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: > Error: 02:28:53 02:28:53.221 [ERROR] > org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest > Error: 02:28:53 02:28:53.221 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) > [...] > {code} > The fatal error is triggered most likely within the > {{FineGrainedSlotManagerTest}}: > {code} > 02:26:39,362 [ pool-643-thread-1] ERROR > org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: > Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the > process... > java.util.concurrent.CompletionException: > java.util.concurrent.RejectedExecutionException: Task > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 > rejected from > java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool > size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) > ~[?:1.8.0_392] > at > java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) > ~[?:1.8.0_392] > at > org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) > ~[classes/:?] > at > org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) > ~[classes/:?] > at >
[jira] [Commented] (FLINK-34434) DefaultSlotStatusSyncer doesn't complete the returned future
[ https://issues.apache.org/jira/browse/FLINK-34434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816961#comment-17816961 ] Matthias Pohl commented on FLINK-34434: --- [~guoyangze] can you have a look at this? > DefaultSlotStatusSyncer doesn't complete the returned future > > > Key: FLINK-34434 > URL: https://issues.apache.org/jira/browse/FLINK-34434 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > > When looking into FLINK-34427 (unrelated), I noticed an odd line in > [DefaultSlotStatusSyncer:155|https://github.com/apache/flink/blob/15fe1653acec45d7c7bac17071e9773a4aa690a4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L155] > where we complete a future that should be already completed (because the > callback is triggered after the {{requestFuture}} is already completed in > some way. Shouldn't we complete the {{returnedFuture}} instead? > I'm keeping the priority at {{Major}} because it doesn't seem to have been an > issue in the past. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34434) DefaultSlotStatusSyncer doesn't complete the returned future
[ https://issues.apache.org/jira/browse/FLINK-34434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816961#comment-17816961 ] Matthias Pohl edited comment on FLINK-34434 at 2/13/24 11:11 AM: - [~guoyangze] can you have a look at this? Maybe, I'm missing something here. was (Author: mapohl): [~guoyangze] can you have a look at this? > DefaultSlotStatusSyncer doesn't complete the returned future > > > Key: FLINK-34434 > URL: https://issues.apache.org/jira/browse/FLINK-34434 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > > When looking into FLINK-34427 (unrelated), I noticed an odd line in > [DefaultSlotStatusSyncer:155|https://github.com/apache/flink/blob/15fe1653acec45d7c7bac17071e9773a4aa690a4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L155] > where we complete a future that should be already completed (because the > callback is triggered after the {{requestFuture}} is already completed in > some way. Shouldn't we complete the {{returnedFuture}} instead? > I'm keeping the priority at {{Major}} because it doesn't seem to have been an > issue in the past. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34434) DefaultSlotStatusSyncer doesn't complete the returned future
Matthias Pohl created FLINK-34434: - Summary: DefaultSlotStatusSyncer doesn't complete the returned future Key: FLINK-34434 URL: https://issues.apache.org/jira/browse/FLINK-34434 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1, 1.17.2, 1.19.0, 1.20.0 Reporter: Matthias Pohl When looking into FLINK-34427 (unrelated), I noticed an odd line in [DefaultSlotStatusSyncer:155|https://github.com/apache/flink/blob/15fe1653acec45d7c7bac17071e9773a4aa690a4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java#L155] where we complete a future that should be already completed (because the callback is triggered after the {{requestFuture}} is already completed in some way. Shouldn't we complete the {{returnedFuture}} instead? I'm keeping the priority at {{Major}} because it doesn't seem to have been an issue in the past. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34427) ResourceManagerTaskExecutorTest fails fatally (exit code 239)
[ https://issues.apache.org/jira/browse/FLINK-34427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-34427: -- Description: https://github.com/apache/flink/actions/runs/7866453350/job/21460921911#step:10:8959 {code} Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: Error: 02:28:53 02:28:53.220 [ERROR] org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest Error: 02:28:53 02:28:53.220 [ERROR] org.apache.maven.surefire.booter.SurefireBooterForkException: ExecutionException The forked VM terminated without properly saying goodbye. VM crash or System.exit called? Error: 02:28:53 02:28:53.220 [ERROR] Command was /bin/sh -c cd '/root/flink/flink-runtime' && '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' '-XX:+UseG1GC' '-Xms256m' '-XX:+IgnoreUnrecognizedVMOptions' '--add-opens=java.base/java.util=ALL-UNNAMED' '--add-opens=java.base/java.lang=ALL-UNNAMED' '--add-opens=java.base/java.net=ALL-UNNAMED' '--add-opens=java.base/java.io=ALL-UNNAMED' '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED' '-Xmx768m' '-jar' '/root/flink/flink-runtime/target/surefire/surefirebooter-20240212022332296_94.jar' '/root/flink/flink-runtime/target/surefire' '2024-02-12T02-21-39_495-jvmRun3' 'surefire-20240212022332296_88tmp' 'surefire_26-20240212022332296_91tmp' Error: 02:28:53 02:28:53.220 [ERROR] Error occurred in starting fork, check output in log Error: 02:28:53 02:28:53.220 [ERROR] Process Exit Code: 239 Error: 02:28:53 02:28:53.220 [ERROR] Crashed tests: Error: 02:28:53 02:28:53.221 [ERROR] org.apache.flink.runtime.resourcemanager.ResourceManagerTaskExecutorTest Error: 02:28:53 02:28:53.221 [ERROR]at org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456) [...] {code} The fatal error is triggered most likely within the {{FineGrainedSlotManagerTest}}: {code} 02:26:39,362 [ pool-643-thread-1] ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'pool-643-thread-1' produced an uncaught exception. Stopping the process... java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_392] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_392] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) ~[?:1.8.0_392] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_392] at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:851) ~[?:1.8.0_392] at java.util.concurrent.CompletableFuture.handleAsync(CompletableFuture.java:2178) ~[?:1.8.0_392] at org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer.allocateSlot(DefaultSlotStatusSyncer.java:138) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.allocateSlotsAccordingTo(FineGrainedSlotManager.java:722) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.checkResourceRequirements(FineGrainedSlotManager.java:645) ~[classes/:?] at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager.lambda$null$12(FineGrainedSlotManager.java:603) ~[classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_392] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_392] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_392] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_392] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_392] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392] Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4bbc0b10 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7a45cd9a[Shutting down, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 194] at