[flink] branch master updated (a5a741876ab -> b9c5dc3d731)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from a5a741876ab [FLINK-32617][jdbc-driver] Return true or false for most methods in database meta data (#23019) add b9c5dc3d731 [FLINK-32616][jdbc-driver] Close result for non-query in executeQuery (#23027) No new revisions were added by this update. Summary of changes: .../apache/flink/table/jdbc/FlinkConnection.java | 7 ++ .../apache/flink/table/jdbc/FlinkStatement.java| 1 + .../flink/table/jdbc/FlinkStatementTest.java | 80 ++ 3 files changed, 88 insertions(+)
[flink] branch master updated (05191071638 -> a5a741876ab)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 05191071638 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions add a5a741876ab [FLINK-32617][jdbc-driver] Return true or false for most methods in database meta data (#23019) No new revisions were added by this update. Summary of changes: .../flink/table/jdbc/BaseDatabaseMetaData.java | 408 - .../flink/table/jdbc/FlinkDatabaseMetaData.java| 347 ++ .../table/jdbc/FlinkDatabaseMetaDataTest.java | 100 + 3 files changed, 447 insertions(+), 408 deletions(-)
[flink] branch master updated (d6967dd7301 -> 05191071638)
This is an automated email from the ASF dual-hosted git repository. lincoln pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d6967dd7301 [FLINK-32610][json] Add document for Json option decode.json-parser.enabled add 05191071638 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions No new revisions were added by this update. Summary of changes: docs/data/sql_functions.yml| 4 +- docs/data/sql_functions_zh.yml | 4 +- .../logical/WrapJsonAggFunctionArgumentsRule.java | 113 + .../functions/JsonAggregationFunctionsITCase.java | 90 ++- .../WrapJsonAggFunctionArgumentsRuleTest.java | 21 +- .../WrapJsonAggFunctionArgumentsRuleTest.xml | 268 - .../runtime/batch/sql/agg/SortAggITCase.scala | 36 +++ .../runtime/stream/sql/AggregateITCase.scala | 43 8 files changed, 504 insertions(+), 75 deletions(-)
[flink] 01/03: [FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter with better performance
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 58f7ceb45013bca847b39901f62ce746680e4f0c Author: fengli AuthorDate: Wed Jul 19 12:21:00 2023 +0800 [FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter with better performance --- .../flink/formats/json/JsonParseException.java | 32 ++ .../json/JsonParserToRowDataConverters.java| 626 + .../formats/json/JsonToRowDataConverters.java | 13 - 3 files changed, 658 insertions(+), 13 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java new file mode 100644 index 000..6c75a37cfd8 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +/** Exception which refers to parse errors in converters. */ +public class JsonParseException extends RuntimeException { +private static final long serialVersionUID = 1L; + +public JsonParseException(String message) { +super(message); +} + +public JsonParseException(String message, Throwable cause) { +super(message, cause); +} +} diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java new file mode 100644 index 000..6556502f0d4 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import
[flink] 03/03: [FLINK-32610][json] Add document for Json option decode.json-parser.enabled
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d6967dd7301e82fa102f756e16635dabce1c550d Author: fengli AuthorDate: Thu Jul 20 20:29:45 2023 +0800 [FLINK-32610][json] Add document for Json option decode.json-parser.enabled --- docs/content.zh/docs/connectors/table/formats/json.md | 7 +++ docs/content/docs/connectors/table/formats/json.md| 8 2 files changed, 15 insertions(+) diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index f7e3040de1f..f1acdd7a001 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -135,6 +135,13 @@ Format 参数 Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.00027。 + + decode.json-parser.enabled + 选填 + true + Boolean + JsonParser 是 Jackson 提供的流式读取 JSON 数据的 API。与 JsonNode 方式相比,这种方式读取速度更快,内存消耗更少。同时,JsonParser 在读取数据时还支持嵌套字段的投影下推。该参数默认启用。如果遇到任何不兼容性问题,可以禁用并回退到 JsonNode 方式。 + diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index d9c5e5cfa41..52345a42ea1 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -146,6 +146,14 @@ Format Options Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.00027 is encoded as 2.7E-8 by default, and will be written as 0.00027 if set this option to true. + + decode.json-parser.enabled + optional + + true + Boolean + Whether to use the Jackson JsonParser to decode json. JsonParser is the Jackson JSON streaming API to read JSON data. This is much faster and consumes less memory compared to the previous JsonNode approach. Meanwhile, JsonParser also supports nested projection pushdown when reading data. This option is enabled by default. You can disable and fallback to the previous JsonNode approach when encountering any incompat [...] +
[flink] 02/03: [FLINK-32610][json] Introduce JsonParserToRowDataConverter based JsonParserToRowDataConverter deserialization schema which supports projection push-down of nested fields
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c6cd5b68058bfab658482ce4ca5774153fb79828 Author: fengli AuthorDate: Wed Jul 19 12:24:51 2023 +0800 [FLINK-32610][json] Introduce JsonParserToRowDataConverter based JsonParserToRowDataConverter deserialization schema which supports projection push-down of nested fields --- ...java => AbstractJsonDeserializationSchema.java} | 58 +--- .../flink/formats/json/JsonFormatFactory.java | 50 ++- .../flink/formats/json/JsonFormatOptions.java | 7 + .../JsonParserRowDataDeserializationSchema.java| 100 ++ .../json/JsonRowDataDeserializationSchema.java | 79 + .../json/JsonRowDataSerializationSchema.java | 2 +- .../flink/formats/json/JsonFormatFactoryTest.java | 4 +- .../json/JsonParserRowDataDeSerSchemaTest.java | 346 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 183 ++- 9 files changed, 611 insertions(+), 218 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java similarity index 70% copy from flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java copy to flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java index 9a57bac203b..aa62e0d5f87 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java @@ -18,7 +18,6 @@ package org.apache.flink.formats.json; -import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.common.TimestampFormat; @@ -30,52 +29,40 @@ import org.apache.flink.util.jackson.JacksonMapperFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import javax.annotation.Nullable; - -import java.io.IOException; import java.util.Objects; -import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}. - * - * Deserializes a byte[] message as a JSON object and reads the specified fields. + * Deserialization schema from JSON to Flink Table/SQL internal data structure {@link RowData}. This + * is the abstract base class which has different implementation. * * Failures during deserialization are forwarded as wrapped IOExceptions. */ -@Internal -public class JsonRowDataDeserializationSchema implements DeserializationSchema { +public abstract class AbstractJsonDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = 1L; /** Flag indicating whether to fail if a field is missing. */ -private final boolean failOnMissingField; +protected final boolean failOnMissingField; /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ -private final boolean ignoreParseErrors; +protected final boolean ignoreParseErrors; /** TypeInformation of the produced {@link RowData}. */ private final TypeInformation resultTypeInfo; -/** - * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data - * structures. - */ -private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; - /** Object mapper for parsing the JSON. */ -private transient ObjectMapper objectMapper; +protected transient ObjectMapper objectMapper; /** Timestamp format specification which is used to parse timestamp. */ private final TimestampFormat timestampFormat; private final boolean hasDecimalType; -public JsonRowDataDeserializationSchema( +public AbstractJsonDeserializationSchema( RowType rowType, TypeInformation resultTypeInfo, boolean failOnMissingField, @@ -88,9 +75,6 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema t instanceof DecimalType); } @@ -107,30 +91,6 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema>() { @@ -81,21 +83,57 @@ public class
[flink] branch master updated (288a4982c34 -> d6967dd7301)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 288a4982c34 [FLINK-32428][table] Introduce base interfaces for CatalogStore new 58f7ceb4501 [FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter with better performance new c6cd5b68058 [FLINK-32610][json] Introduce JsonParserToRowDataConverter based JsonParserToRowDataConverter deserialization schema which supports projection push-down of nested fields new d6967dd7301 [FLINK-32610][json] Add document for Json option decode.json-parser.enabled The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../docs/connectors/table/formats/json.md | 7 + docs/content/docs/connectors/table/formats/json.md | 8 + ...java => AbstractJsonDeserializationSchema.java} | 58 +- .../flink/formats/json/JsonFormatFactory.java | 50 +- .../flink/formats/json/JsonFormatOptions.java | 7 + .../flink/formats/json/JsonParseException.java | 11 +- .../JsonParserRowDataDeserializationSchema.java| 100 .../json/JsonParserToRowDataConverters.java| 626 + .../json/JsonRowDataDeserializationSchema.java | 79 +-- .../json/JsonRowDataSerializationSchema.java | 2 +- .../formats/json/JsonToRowDataConverters.java | 13 - .../flink/formats/json/JsonFormatFactoryTest.java | 4 +- .../json/JsonParserRowDataDeSerSchemaTest.java | 346 .../formats/json/JsonRowDataSerDeSchemaTest.java | 183 +++--- 14 files changed, 1257 insertions(+), 237 deletions(-) copy flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/{JsonRowDataDeserializationSchema.java => AbstractJsonDeserializationSchema.java} (70%) copy flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java => flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java (77%) create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java create mode 100644 flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java
[flink-connector-shared-utils] branch spotless-jdk17 deleted (was 40cef7c)
This is an automated email from the ASF dual-hosted git repository. tison pushed a change to branch spotless-jdk17 in repository https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git was 40cef7c Update pom.xml The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[flink-connector-shared-utils] branch parent_pom updated: [FLINK-29436] Enable spotless on Java 17
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch parent_pom in repository https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git The following commit(s) were added to refs/heads/parent_pom by this push: new 813ab45 [FLINK-29436] Enable spotless on Java 17 813ab45 is described below commit 813ab45461ef29dd841d63995e97b40125a0d7e4 Author: tison AuthorDate: Thu Jul 20 21:28:45 2023 +0800 [FLINK-29436] Enable spotless on Java 17 --- pom.xml | 24 1 file changed, 24 deletions(-) diff --git a/pom.xml b/pom.xml index 70402b9..00c9a9b 100644 --- a/pom.xml +++ b/pom.xml @@ -908,30 +908,6 @@ under the License. - -java17 - -[17,) - - - - - - -com.diffplug.spotless -spotless-maven-plugin - - -true - - - - - - - java11-target
[flink-kubernetes-operator] branch decoupling-autoscaler-k8s-v2 deleted (was 2fd674cb)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a change to branch decoupling-autoscaler-k8s-v2 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git was 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module This change permanently discards the following revisions: discard 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module
[flink-kubernetes-operator] 01/01: [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch decoupling-autoscaler-k8s-v2 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 2fd674cb7c64ded9f43dd60319cc03b59350dc84 Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Thu Jul 20 18:58:53 2023 +0800 [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module --- flink-autoscaler/pom.xml | 52 + .../flink}/autoscaler/AutoscalerFlinkMetrics.java | 24 +++--- .../flink}/autoscaler/ScalingMetricEvaluator.java | 53 +++--- .../apache/flink}/autoscaler/ScalingSummary.java | 8 ++-- .../autoscaler/config/AutoScalerOptions.java | 10 ++-- .../autoscaler/metrics/CollectedMetricHistory.java | 4 +- .../autoscaler/metrics/CollectedMetrics.java | 2 +- .../org/apache/flink}/autoscaler/metrics/Edge.java | 2 +- .../autoscaler/metrics/EvaluatedScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/FlinkMetric.java | 2 +- .../autoscaler/metrics/MetricAggregator.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetrics.java | 8 ++-- .../flink}/autoscaler/topology/JobTopology.java| 2 +- .../flink}/autoscaler/topology/VertexInfo.java | 2 +- .../autoscaler/utils/AutoScalerSerDeModule.java| 4 +- .../flink}/autoscaler/utils/AutoScalerUtils.java | 17 --- .../autoscaler/ScalingMetricEvaluatorTest.java | 44 +- .../autoscaler/metrics/ScalingMetricsTest.java | 8 ++-- .../autoscaler/utils/AutoScalerUtilsTest.java | 4 +- .../src/test/resources/log4j2-test.properties | 26 +++ flink-kubernetes-operator-autoscaler/pom.xml | 6 +++ .../operator/autoscaler/AutoScalerInfo.java| 7 +-- .../operator/autoscaler/JobAutoScalerImpl.java | 18 .../autoscaler/JobAutoscalerFactoryImpl.java | 1 + .../operator/autoscaler/JobVertexScaler.java | 29 ++-- .../autoscaler/RestApiMetricsCollector.java| 2 +- .../operator/autoscaler/ScalingExecutor.java | 19 .../autoscaler/ScalingMetricCollector.java | 16 +++ .../operator/autoscaler/AutoScalerInfoTest.java| 9 ++-- .../autoscaler/BacklogBasedScalingTest.java| 14 +++--- .../operator/autoscaler/JobAutoScalerImplTest.java | 12 +++-- .../operator/autoscaler/JobTopologyTest.java | 2 +- .../operator/autoscaler/JobVertexScalerTest.java | 8 ++-- .../MetricsCollectionAndEvaluationTest.java| 15 +++--- .../autoscaler/RecommendedParallelismTest.java | 15 +++--- .../autoscaler/RestApiMetricsCollectorTest.java| 2 +- .../operator/autoscaler/ScalingExecutorTest.java | 8 ++-- .../autoscaler/ScalingMetricCollectorTest.java | 6 +-- .../autoscaler/TestingMetricsCollector.java| 4 +- pom.xml| 1 + 41 files changed, 293 insertions(+), 179 deletions(-) diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml new file mode 100644 index ..458ba878 --- /dev/null +++ b/flink-autoscaler/pom.xml @@ -0,0 +1,52 @@ + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.flink +flink-kubernetes-operator-parent +1.6-SNAPSHOT +.. + + +flink-autoscaler +Flink Autoscaler +jar + + +2.15.0 + + + + + +org.apache.flink +flink-runtime +${flink.version} +provided + + + +jackson-dataformat-yaml +com.fasterxml.jackson.dataformat +${jackson.version} +provided + + + +org.projectlombok +lombok +${lombok.version} +provided + + + +org.junit.jupiter +junit-jupiter-params +test + + + + \ No newline at end of file diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java similarity index 90% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java index fab3cb32..1a0f0671 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java +++
[flink-kubernetes-operator] branch decoupling-autoscaler-k8s-v2 created (now 2fd674cb)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a change to branch decoupling-autoscaler-k8s-v2 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git at 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module This branch includes the following new commits: new 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink-connector-pulsar] branch main updated: [FLINK-24302] Extend offheap memory for JDK11 test coverage (#55)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new 3b6c3af [FLINK-24302] Extend offheap memory for JDK11 test coverage (#55) 3b6c3af is described below commit 3b6c3aff8aeca3cc17673bbc84b90b70c1c680a9 Author: tison AuthorDate: Thu Jul 20 18:23:14 2023 +0800 [FLINK-24302] Extend offheap memory for JDK11 test coverage (#55) Signed-off-by: tison Co-authored-by: Yufan Sheng --- .../docs/connectors/datastream/pulsar.md | 18 +- docs/content/docs/connectors/datastream/pulsar.md | 17 -- flink-connector-pulsar-e2e-tests/pom.xml | 10 -- .../flink/tests/util/pulsar/PulsarSinkE2ECase.java | 3 - .../tests/util/pulsar/PulsarSourceE2ECase.java | 3 - .../util/pulsar/common/FlinkContainerUtils.java| 7 +- .../source/config/PulsarSourceConfigUtils.java | 13 +- .../pulsar/common/MiniClusterTestEnvironment.java | 184 + .../connector/pulsar/sink/PulsarSinkITCase.java| 4 +- .../pulsar/source/PulsarSourceITCase.java | 4 +- 10 files changed, 202 insertions(+), 61 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md index 28c993c..4625324 100644 --- a/docs/content.zh/docs/connectors/datastream/pulsar.md +++ b/docs/content.zh/docs/connectors/datastream/pulsar.md @@ -28,7 +28,7 @@ Flink 当前提供 [Apache Pulsar](https://pulsar.apache.org) Source 和 Sink ## 添加依赖 -当前支持 Pulsar 2.10.0 及其之后的版本,建议在总是将 Pulsar 升级至最新版。如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。 +当前支持 Pulsar 2.10.0 及其之后的版本,建议总是将 Pulsar 升级至最新版。如果想要了解更多对于 Pulsar API 兼容性设计,可以阅读文档 [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。 {{< connector_artifact flink-connector-pulsar pulsar >}} @@ -1088,22 +1088,6 @@ PulsarSink sink = PulsarSink.builder() 用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。 -## 已知问题 - -本节介绍有关 Pulsar 连接器的一些已知问题。 - -### 在 Java 11 上使用不稳定 - -Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector. - -### 不自动重连,而是抛出TransactionCoordinatorNotFound异常 - -Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2 -引入了这个问题 [a break change](https://github.com/apache/pulsar/pull/13135)。 -如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。 - -您可以使用最新的`pulsar-client-all`分支来解决这个问题。 - {{< top >}} [schema-evolution]: https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md index 98b701e..5f991fa 100644 --- a/docs/content/docs/connectors/datastream/pulsar.md +++ b/docs/content/docs/connectors/datastream/pulsar.md @@ -1276,23 +1276,6 @@ If you have a problem with Pulsar when using Flink, keep in mind that Flink only and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers, reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink. -## Known Issues - -This section describes some known issues about the Pulsar connectors. - -### Unstable on Java 11 - -Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector -on Java 8. - -### No TransactionCoordinatorNotFound, but automatic reconnect - -Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2 -introduces [a break change](https://github.com/apache/pulsar/pull/13135) in transactions. -If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `TransactionCoordinatorNotFound` exception. - -You can use the latest `pulsar-client-all` release to resolve this issue. - {{< top >}} [schema-evolution]: https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml index c88e268..b730be7 100644 --- a/flink-connector-pulsar-e2e-tests/pom.xml +++ b/flink-connector-pulsar-e2e-tests/pom.xml @@ -75,7 +75,6 @@ under the License. **/*.* - ${excludeE2E} ${project.basedir}
[flink] branch master updated: [FLINK-32428][table] Introduce base interfaces for CatalogStore
This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 288a4982c34 [FLINK-32428][table] Introduce base interfaces for CatalogStore 288a4982c34 is described below commit 288a4982c3473fcc08be52a6641d77c4ed2cdb5b Author: Feng Jin AuthorDate: Thu Jul 20 18:11:36 2023 +0800 [FLINK-32428][table] Introduce base interfaces for CatalogStore This closes #22937. --- .../table/catalog/GenericInMemoryCatalogStore.java | 79 +++ .../GenericInMemoryCatalogStoreFactory.java| 54 .../GenericInMemoryCatalogStoreFactoryOptions.java | 28 .../org.apache.flink.table.factories.Factory | 1 + .../GenericInMemoryCatalogStoreFactoryTest.java| 52 +++ .../catalog/GenericInMemoryCatalogStoreTest.java | 71 ++ .../flink/table/catalog/AbstractCatalogStore.java | 49 +++ .../flink/table/catalog/CatalogDescriptor.java | 65 + .../apache/flink/table/catalog/CatalogStore.java | 95 + .../flink/table/factories/CatalogStoreFactory.java | 152 + .../apache/flink/table/factories/FactoryUtil.java | 35 + .../flink/table/factories/FactoryUtilTest.java | 16 +++ .../table/factories/TestCatalogStoreFactory.java | 107 +++ .../org.apache.flink.table.factories.Factory | 1 + 14 files changed, 805 insertions(+) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java new file mode 100644 index 000..db5bad70804 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.lang.String.format; + +/** A generic catalog store implementation that store all catalog configuration in memory. */ +public class GenericInMemoryCatalogStore extends AbstractCatalogStore { + +private final Map descriptors; + +public GenericInMemoryCatalogStore() { +descriptors = new HashMap<>(); +} + +@Override +public void storeCatalog(String catalogName, CatalogDescriptor catalog) +throws CatalogException { +checkOpenState(); +if (descriptors.containsKey(catalogName)) { +throw new CatalogException( +format("Catalog %s already exists in the catalog store.", catalogName)); +} +descriptors.put(catalogName, catalog); +} + +@Override +public void removeCatalog(String catalogName, boolean ignoreIfNotExists) +throws CatalogException { +checkOpenState(); +if (descriptors.containsKey(catalogName)) { +descriptors.remove(catalogName); +} else if (!ignoreIfNotExists) { +throw new CatalogException( +format("Catalog %s does not exist in the catalog store.", catalogName)); +} +} + +@Override +public Optional getCatalog(String catalogName) { +checkOpenState(); +return Optional.ofNullable(descriptors.get(catalogName)); +} + +@Override +public Set listCatalogs() { +checkOpenState(); +return descriptors.keySet(); +} + +@Override +public boolean contains(String catalogName) { +checkOpenState(); +return descriptors.containsKey(catalogName); +} +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactory.java new file mode 100644 index
[flink] branch master updated: [FLINK-32169][ui] Show slot allocations on TM page
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new acd34941e34 [FLINK-32169][ui] Show slot allocations on TM page acd34941e34 is described below commit acd34941e349cdb513fc41669a298cc1c33cc3ec Author: Chesnay Schepler AuthorDate: Tue May 23 16:26:05 2023 +0200 [FLINK-32169][ui] Show slot allocations on TM page --- .../src/app/interfaces/task-manager.ts | 6 .../metrics/task-manager-metrics.component.html| 36 +- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts index 9c574bb71fc..c0865d2d0a6 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts @@ -33,6 +33,12 @@ export interface TaskManagerDetail { blocked?: boolean; freeResource: Resources; totalResource: Resources; + allocatedSlots: AllocatedSlot[]; +} + +export interface AllocatedSlot { + jobId: string; + resource: Resources; } export interface Resources { diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html index 99a36df3f95..b4a41b521f2 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html @@ -318,7 +318,7 @@ - + + + + +# +Job ID +CPU (cores) +Task Heap memory (MB) +Task Off-Heap memory (MB) +Managed memory (MB) +Network memory (MB) + + + + + + {{ i | number }} + +{{ slot.jobId }} +{{ slot.resource.cpuCores | number }} +{{ slot.resource.taskHeapMemory | number }} +{{ slot.resource.taskOffHeapMemory | number }} +{{ slot.resource.managedMemory | number }} +{{ slot.resource.networkMemory | number }} + + +
[flink] branch master updated (fd96076b1b4 -> 86207724d5b)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from fd96076b1b4 [FLINK-32404][table] Add catalog modification listener interface and create listener for catalog manager (#22924) add 86207724d5b [FLINK-19010][metric] Introduce subtask level restore metric No new revisions were added by this update. Summary of changes: docs/content.zh/docs/ops/metrics.md| 5 ++ docs/content/docs/ops/metrics.md | 7 +- .../apache/flink/runtime/metrics/MetricNames.java | 1 + .../runtime/metrics/groups/TaskIOMetricGroup.java | 91 ++ .../metrics/groups/TaskIOMetricGroupTest.java | 63 +-- .../flink/streaming/runtime/tasks/StreamTask.java | 1 + 6 files changed, 131 insertions(+), 37 deletions(-)
[flink] branch master updated (d8e77674a88 -> fd96076b1b4)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d8e77674a88 [FLINK-32403][table] Add database related operations in CatalogManager (#22869) add fd96076b1b4 [FLINK-32404][table] Add catalog modification listener interface and create listener for catalog manager (#22924) No new revisions were added by this update. Summary of changes: .../generated/table_config_configuration.html | 14 +++-- .../gateway/service/context/SessionContext.java| 6 +- .../service/context/SessionContextTest.java| 69 + .../java/internal/StreamTableEnvironmentImpl.java | 4 ++ .../flink/table/api/config/TableConfigOptions.java | 13 .../table/api/internal/TableEnvironmentImpl.java | 3 + .../apache/flink/table/catalog/CatalogManager.java | 25 +++- .../table/catalog/listener/CatalogContext.java | 45 ++ .../catalog/listener/CatalogModificationEvent.java | 31 ++ .../listener/CatalogModificationListener.java | 34 ++ .../CatalogModificationListenerFactory.java| 57 + .../flink/table/factories/TableFactoryUtil.java| 32 ++ .../table/catalog/listener/CatalogFactory1.java| 34 ++ .../table/catalog/listener/CatalogFactory2.java| 34 ++ .../table/catalog/listener/CatalogListener1.java | 28 + .../table/catalog/listener/CatalogListener2.java | 28 + .../listener/CatalogListenerFactoryTest.java | 59 ++ .../org.apache.flink.table.factories.Factory | 17 + .../internal/StreamTableEnvironmentImpl.scala | 6 +- .../apache/flink/table/api/EnvironmentTest.java| 72 -- .../runtime/catalog/CatalogListenerTest.scala | 61 ++ 21 files changed, 659 insertions(+), 13 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationEvent.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationListener.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationListenerFactory.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogFactory1.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogFactory2.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListener1.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListener2.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListenerFactoryTest.java create mode 100644 flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/catalog/CatalogListenerTest.scala
[flink] branch master updated: [FLINK-32403][table] Add database related operations in CatalogManager (#22869)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d8e77674a88 [FLINK-32403][table] Add database related operations in CatalogManager (#22869) d8e77674a88 is described below commit d8e77674a885feba22dd079656e4b39f33fa5da1 Author: Shammon FY AuthorDate: Thu Jul 20 14:46:43 2023 +0800 [FLINK-32403][table] Add database related operations in CatalogManager (#22869) --- .../apache/flink/table/catalog/CatalogManager.java | 67 ++ .../operations/ddl/AlterDatabaseOperation.java | 6 +- .../operations/ddl/CreateDatabaseOperation.java| 5 +- .../operations/ddl/DropDatabaseOperation.java | 5 +- .../operations/SqlDdlToOperationConverterTest.java | 10 ++-- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 2ec64890e93..4be17444b9a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -28,6 +28,8 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; @@ -1066,4 +1068,69 @@ public final class CatalogManager implements CatalogRegistry { final ResolvedSchema resolvedSchema = view.getUnresolvedSchema().resolve(schemaResolver); return new ResolvedCatalogView(view, resolvedSchema); } + +/** + * Create a database. + * + * @param catalogName Name of the catalog for database + * @param databaseName Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ +public void createDatabase( +String catalogName, +String databaseName, +CatalogDatabase database, +boolean ignoreIfExists) +throws DatabaseAlreadyExistException, CatalogException { +Catalog catalog = getCatalogOrThrowException(catalogName); +catalog.createDatabase(databaseName, database, ignoreIfExists); +} + +/** + * Drop a database. + * + * @param catalogName Name of the catalog for database. + * @param databaseName Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ +public void dropDatabase( +String catalogName, String databaseName, boolean ignoreIfNotExists, boolean cascade) +throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { +Catalog catalog = getCatalogOrError(catalogName); +catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade); +} + +/** + * Modify an existing database. + * + * @param catalogName Name of the catalog for database + * @param databaseName Name of the database to be dropped + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws