[flink] branch master updated (a5a741876ab -> b9c5dc3d731)

2023-07-20 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from a5a741876ab [FLINK-32617][jdbc-driver] Return true or false for most 
methods in database meta data (#23019)
 add b9c5dc3d731 [FLINK-32616][jdbc-driver] Close result for non-query in 
executeQuery (#23027)

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/jdbc/FlinkConnection.java   |  7 ++
 .../apache/flink/table/jdbc/FlinkStatement.java|  1 +
 .../flink/table/jdbc/FlinkStatementTest.java   | 80 ++
 3 files changed, 88 insertions(+)



[flink] branch master updated (05191071638 -> a5a741876ab)

2023-07-20 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 05191071638 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & 
JSON_ARRAYAGG use with other aggregate functions
 add a5a741876ab [FLINK-32617][jdbc-driver] Return true or false for most 
methods in database meta data (#23019)

No new revisions were added by this update.

Summary of changes:
 .../flink/table/jdbc/BaseDatabaseMetaData.java | 408 -
 .../flink/table/jdbc/FlinkDatabaseMetaData.java| 347 ++
 .../table/jdbc/FlinkDatabaseMetaDataTest.java  | 100 +
 3 files changed, 447 insertions(+), 408 deletions(-)



[flink] branch master updated (d6967dd7301 -> 05191071638)

2023-07-20 Thread lincoln
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from d6967dd7301 [FLINK-32610][json] Add document for Json option 
decode.json-parser.enabled
 add 05191071638 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & 
JSON_ARRAYAGG use with other aggregate functions

No new revisions were added by this update.

Summary of changes:
 docs/data/sql_functions.yml|   4 +-
 docs/data/sql_functions_zh.yml |   4 +-
 .../logical/WrapJsonAggFunctionArgumentsRule.java  | 113 +
 .../functions/JsonAggregationFunctionsITCase.java  |  90 ++-
 .../WrapJsonAggFunctionArgumentsRuleTest.java  |  21 +-
 .../WrapJsonAggFunctionArgumentsRuleTest.xml   | 268 -
 .../runtime/batch/sql/agg/SortAggITCase.scala  |  36 +++
 .../runtime/stream/sql/AggregateITCase.scala   |  43 
 8 files changed, 504 insertions(+), 75 deletions(-)



[flink] 01/03: [FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter with better performance

2023-07-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58f7ceb45013bca847b39901f62ce746680e4f0c
Author: fengli 
AuthorDate: Wed Jul 19 12:21:00 2023 +0800

[FLINK-32610][json] Introduce JsonParser based JsonParserToRowDataConverter 
with better performance
---
 .../flink/formats/json/JsonParseException.java |  32 ++
 .../json/JsonParserToRowDataConverters.java| 626 +
 .../formats/json/JsonToRowDataConverters.java  |  13 -
 3 files changed, 658 insertions(+), 13 deletions(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
new file mode 100644
index 000..6c75a37cfd8
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+/** Exception which refers to parse errors in converters. */
+public class JsonParseException extends RuntimeException {
+private static final long serialVersionUID = 1L;
+
+public JsonParseException(String message) {
+super(message);
+}
+
+public JsonParseException(String message, Throwable cause) {
+super(message, cause);
+}
+}
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
new file mode 100644
index 000..6556502f0d4
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import 

[flink] 03/03: [FLINK-32610][json] Add document for Json option decode.json-parser.enabled

2023-07-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d6967dd7301e82fa102f756e16635dabce1c550d
Author: fengli 
AuthorDate: Thu Jul 20 20:29:45 2023 +0800

[FLINK-32610][json] Add document for Json option decode.json-parser.enabled
---
 docs/content.zh/docs/connectors/table/formats/json.md | 7 +++
 docs/content/docs/connectors/table/formats/json.md| 8 
 2 files changed, 15 insertions(+)

diff --git a/docs/content.zh/docs/connectors/table/formats/json.md 
b/docs/content.zh/docs/connectors/table/formats/json.md
index f7e3040de1f..f1acdd7a001 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -135,6 +135,13 @@ Format 参数
   Boolean
   将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 
2.7E-8。当此选项设为 true 时,则会表示为 0.00027。
 
+
+  decode.json-parser.enabled
+  选填
+  true
+  Boolean
+  JsonParser 是 Jackson 提供的流式读取 JSON 数据的 API。与 
JsonNode 方式相比,这种方式读取速度更快,内存消耗更少。同时,JsonParser 
在读取数据时还支持嵌套字段的投影下推。该参数默认启用。如果遇到任何不兼容性问题,可以禁用并回退到 JsonNode 方式。
+
 
 
 
diff --git a/docs/content/docs/connectors/table/formats/json.md 
b/docs/content/docs/connectors/table/formats/json.md
index d9c5e5cfa41..52345a42ea1 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -146,6 +146,14 @@ Format Options
   Boolean
   Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, 0.00027 is encoded as 2.7E-8 by default, 
and will be written as 0.00027 if set this option to true.
 
+
+  decode.json-parser.enabled
+  optional
+  
+  true
+  Boolean
+  Whether to use the Jackson JsonParser to decode json. 
JsonParser is the Jackson JSON streaming API to read JSON data. 
This is much faster and consumes less memory compared to the previous 
JsonNode approach. Meanwhile, JsonParser also 
supports nested projection pushdown when reading data. This option is enabled 
by default. You can disable and fallback to the previous JsonNode 
approach when encountering any incompat [...]
+
 
 
 



[flink] 02/03: [FLINK-32610][json] Introduce JsonParserToRowDataConverter based JsonParserToRowDataConverter deserialization schema which supports projection push-down of nested fields

2023-07-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6cd5b68058bfab658482ce4ca5774153fb79828
Author: fengli 
AuthorDate: Wed Jul 19 12:24:51 2023 +0800

[FLINK-32610][json] Introduce JsonParserToRowDataConverter based 
JsonParserToRowDataConverter deserialization schema which supports projection 
push-down of nested fields
---
 ...java => AbstractJsonDeserializationSchema.java} |  58 +---
 .../flink/formats/json/JsonFormatFactory.java  |  50 ++-
 .../flink/formats/json/JsonFormatOptions.java  |   7 +
 .../JsonParserRowDataDeserializationSchema.java| 100 ++
 .../json/JsonRowDataDeserializationSchema.java |  79 +
 .../json/JsonRowDataSerializationSchema.java   |   2 +-
 .../flink/formats/json/JsonFormatFactoryTest.java  |   4 +-
 .../json/JsonParserRowDataDeSerSchemaTest.java | 346 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 183 ++-
 9 files changed, 611 insertions(+), 218 deletions(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
similarity index 70%
copy from 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
copy to 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
index 9a57bac203b..aa62e0d5f87 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/AbstractJsonDeserializationSchema.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -30,52 +29,40 @@ import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
 import java.util.Objects;
 
-import static java.lang.String.format;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Deserialization schema from JSON to Flink Table/SQL internal data structure 
{@link RowData}.
- *
- * Deserializes a byte[] message as a JSON object and reads 
the specified fields.
+ * Deserialization schema from JSON to Flink Table/SQL internal data structure 
{@link RowData}. This
+ * is the abstract base class which has different implementation.
  *
  * Failures during deserialization are forwarded as wrapped IOExceptions.
  */
-@Internal
-public class JsonRowDataDeserializationSchema implements 
DeserializationSchema {
+public abstract class AbstractJsonDeserializationSchema implements 
DeserializationSchema {
+
 private static final long serialVersionUID = 1L;
 
 /** Flag indicating whether to fail if a field is missing. */
-private final boolean failOnMissingField;
+protected final boolean failOnMissingField;
 
 /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
-private final boolean ignoreParseErrors;
+protected final boolean ignoreParseErrors;
 
 /** TypeInformation of the produced {@link RowData}. */
 private final TypeInformation resultTypeInfo;
 
-/**
- * Runtime converter that converts {@link JsonNode}s into objects of Flink 
SQL internal data
- * structures.
- */
-private final JsonToRowDataConverters.JsonToRowDataConverter 
runtimeConverter;
-
 /** Object mapper for parsing the JSON. */
-private transient ObjectMapper objectMapper;
+protected transient ObjectMapper objectMapper;
 
 /** Timestamp format specification which is used to parse timestamp. */
 private final TimestampFormat timestampFormat;
 
 private final boolean hasDecimalType;
 
-public JsonRowDataDeserializationSchema(
+public AbstractJsonDeserializationSchema(
 RowType rowType,
 TypeInformation resultTypeInfo,
 boolean failOnMissingField,
@@ -88,9 +75,6 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema t 
instanceof DecimalType);
 }
@@ -107,30 +91,6 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema>() 
{
@@ -81,21 +83,57 @@ public class 

[flink] branch master updated (288a4982c34 -> d6967dd7301)

2023-07-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 288a4982c34 [FLINK-32428][table] Introduce base interfaces for 
CatalogStore
 new 58f7ceb4501 [FLINK-32610][json] Introduce JsonParser based 
JsonParserToRowDataConverter with better performance
 new c6cd5b68058 [FLINK-32610][json] Introduce JsonParserToRowDataConverter 
based JsonParserToRowDataConverter deserialization schema which supports 
projection push-down of nested fields
 new d6967dd7301 [FLINK-32610][json] Add document for Json option 
decode.json-parser.enabled

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/table/formats/json.md  |   7 +
 docs/content/docs/connectors/table/formats/json.md |   8 +
 ...java => AbstractJsonDeserializationSchema.java} |  58 +-
 .../flink/formats/json/JsonFormatFactory.java  |  50 +-
 .../flink/formats/json/JsonFormatOptions.java  |   7 +
 .../flink/formats/json/JsonParseException.java |  11 +-
 .../JsonParserRowDataDeserializationSchema.java| 100 
 .../json/JsonParserToRowDataConverters.java| 626 +
 .../json/JsonRowDataDeserializationSchema.java |  79 +--
 .../json/JsonRowDataSerializationSchema.java   |   2 +-
 .../formats/json/JsonToRowDataConverters.java  |  13 -
 .../flink/formats/json/JsonFormatFactoryTest.java  |   4 +-
 .../json/JsonParserRowDataDeSerSchemaTest.java | 346 
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 183 +++---
 14 files changed, 1257 insertions(+), 237 deletions(-)
 copy 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/{JsonRowDataDeserializationSchema.java
 => AbstractJsonDeserializationSchema.java} (70%)
 copy 
flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java 
=> 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParseException.java
 (77%)
 create mode 100644 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserRowDataDeserializationSchema.java
 create mode 100644 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
 create mode 100644 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonParserRowDataDeSerSchemaTest.java



[flink-connector-shared-utils] branch spotless-jdk17 deleted (was 40cef7c)

2023-07-20 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch spotless-jdk17
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


 was 40cef7c  Update pom.xml

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[flink-connector-shared-utils] branch parent_pom updated: [FLINK-29436] Enable spotless on Java 17

2023-07-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch parent_pom
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/parent_pom by this push:
 new 813ab45  [FLINK-29436] Enable spotless on Java 17
813ab45 is described below

commit 813ab45461ef29dd841d63995e97b40125a0d7e4
Author: tison 
AuthorDate: Thu Jul 20 21:28:45 2023 +0800

[FLINK-29436] Enable spotless on Java 17
---
 pom.xml | 24 
 1 file changed, 24 deletions(-)

diff --git a/pom.xml b/pom.xml
index 70402b9..00c9a9b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -908,30 +908,6 @@ under the License.
 
 
 
-
-java17
-
-[17,)
-
-
-
-
-
-
-com.diffplug.spotless
-spotless-maven-plugin
-
-
-true
-
-
-
-
-
-
-
 
 java11-target
 



[flink-kubernetes-operator] branch decoupling-autoscaler-k8s-v2 deleted (was 2fd674cb)

2023-07-20 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a change to branch decoupling-autoscaler-k8s-v2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


 was 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module

This change permanently discards the following revisions:

 discard 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module



[flink-kubernetes-operator] 01/01: [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module

2023-07-20 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch decoupling-autoscaler-k8s-v2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 2fd674cb7c64ded9f43dd60319cc03b59350dc84
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Thu Jul 20 18:58:53 2023 +0800

[FLIP-334] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module
---
 flink-autoscaler/pom.xml   | 52 +
 .../flink}/autoscaler/AutoscalerFlinkMetrics.java  | 24 +++---
 .../flink}/autoscaler/ScalingMetricEvaluator.java  | 53 +++---
 .../apache/flink}/autoscaler/ScalingSummary.java   |  8 ++--
 .../autoscaler/config/AutoScalerOptions.java   | 10 ++--
 .../autoscaler/metrics/CollectedMetricHistory.java |  4 +-
 .../autoscaler/metrics/CollectedMetrics.java   |  2 +-
 .../org/apache/flink}/autoscaler/metrics/Edge.java |  2 +-
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  2 +-
 .../flink}/autoscaler/metrics/FlinkMetric.java |  2 +-
 .../autoscaler/metrics/MetricAggregator.java   |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetric.java   |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetrics.java  |  8 ++--
 .../flink}/autoscaler/topology/JobTopology.java|  2 +-
 .../flink}/autoscaler/topology/VertexInfo.java |  2 +-
 .../autoscaler/utils/AutoScalerSerDeModule.java|  4 +-
 .../flink}/autoscaler/utils/AutoScalerUtils.java   | 17 ---
 .../autoscaler/ScalingMetricEvaluatorTest.java | 44 +-
 .../autoscaler/metrics/ScalingMetricsTest.java |  8 ++--
 .../autoscaler/utils/AutoScalerUtilsTest.java  |  4 +-
 .../src/test/resources/log4j2-test.properties  | 26 +++
 flink-kubernetes-operator-autoscaler/pom.xml   |  6 +++
 .../operator/autoscaler/AutoScalerInfo.java|  7 +--
 .../operator/autoscaler/JobAutoScalerImpl.java | 18 
 .../autoscaler/JobAutoscalerFactoryImpl.java   |  1 +
 .../operator/autoscaler/JobVertexScaler.java   | 29 ++--
 .../autoscaler/RestApiMetricsCollector.java|  2 +-
 .../operator/autoscaler/ScalingExecutor.java   | 19 
 .../autoscaler/ScalingMetricCollector.java | 16 +++
 .../operator/autoscaler/AutoScalerInfoTest.java|  9 ++--
 .../autoscaler/BacklogBasedScalingTest.java| 14 +++---
 .../operator/autoscaler/JobAutoScalerImplTest.java | 12 +++--
 .../operator/autoscaler/JobTopologyTest.java   |  2 +-
 .../operator/autoscaler/JobVertexScalerTest.java   |  8 ++--
 .../MetricsCollectionAndEvaluationTest.java| 15 +++---
 .../autoscaler/RecommendedParallelismTest.java | 15 +++---
 .../autoscaler/RestApiMetricsCollectorTest.java|  2 +-
 .../operator/autoscaler/ScalingExecutorTest.java   |  8 ++--
 .../autoscaler/ScalingMetricCollectorTest.java |  6 +--
 .../autoscaler/TestingMetricsCollector.java|  4 +-
 pom.xml|  1 +
 41 files changed, 293 insertions(+), 179 deletions(-)

diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
new file mode 100644
index ..458ba878
--- /dev/null
+++ b/flink-autoscaler/pom.xml
@@ -0,0 +1,52 @@
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.flink
+flink-kubernetes-operator-parent
+1.6-SNAPSHOT
+..
+
+
+flink-autoscaler
+Flink Autoscaler
+jar
+
+
+2.15.0
+
+
+
+
+
+org.apache.flink
+flink-runtime
+${flink.version}
+provided
+
+
+
+jackson-dataformat-yaml
+com.fasterxml.jackson.dataformat
+${jackson.version}
+provided
+
+
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+
+
\ No newline at end of file
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
similarity index 90%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java
index fab3cb32..1a0f0671 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ 

[flink-kubernetes-operator] branch decoupling-autoscaler-k8s-v2 created (now 2fd674cb)

2023-07-20 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a change to branch decoupling-autoscaler-k8s-v2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


  at 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module

This branch includes the following new commits:

 new 2fd674cb [FLIP-334] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink-connector-pulsar] branch main updated: [FLINK-24302] Extend offheap memory for JDK11 test coverage (#55)

2023-07-20 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new 3b6c3af  [FLINK-24302] Extend offheap memory for JDK11 test coverage 
(#55)
3b6c3af is described below

commit 3b6c3aff8aeca3cc17673bbc84b90b70c1c680a9
Author: tison 
AuthorDate: Thu Jul 20 18:23:14 2023 +0800

[FLINK-24302] Extend offheap memory for JDK11 test coverage (#55)

Signed-off-by: tison 
Co-authored-by: Yufan Sheng 
---
 .../docs/connectors/datastream/pulsar.md   |  18 +-
 docs/content/docs/connectors/datastream/pulsar.md  |  17 --
 flink-connector-pulsar-e2e-tests/pom.xml   |  10 --
 .../flink/tests/util/pulsar/PulsarSinkE2ECase.java |   3 -
 .../tests/util/pulsar/PulsarSourceE2ECase.java |   3 -
 .../util/pulsar/common/FlinkContainerUtils.java|   7 +-
 .../source/config/PulsarSourceConfigUtils.java |  13 +-
 .../pulsar/common/MiniClusterTestEnvironment.java  | 184 +
 .../connector/pulsar/sink/PulsarSinkITCase.java|   4 +-
 .../pulsar/source/PulsarSourceITCase.java  |   4 +-
 10 files changed, 202 insertions(+), 61 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md 
b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 28c993c..4625324 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -28,7 +28,7 @@ Flink 当前提供 [Apache Pulsar](https://pulsar.apache.org) Source 
和 Sink 
 
 ## 添加依赖
 
-当前支持 Pulsar 2.10.0 及其之后的版本,建议在总是将 Pulsar 升级至最新版。如果想要了解更多关于 Pulsar API 
兼容性设计,可以阅读文档 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
+当前支持 Pulsar 2.10.0 及其之后的版本,建议总是将 Pulsar 升级至最新版。如果想要了解更多对于 Pulsar API 
兼容性设计,可以阅读文档 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
 
 {{< connector_artifact flink-connector-pulsar pulsar >}}
 
@@ -1088,22 +1088,6 @@ PulsarSink sink = PulsarSink.builder()
 
 用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 
连接器的配置来尝试解决问题。
 
-## 已知问题
-
-本节介绍有关 Pulsar 连接器的一些已知问题。
-
-### 在 Java 11 上使用不稳定
-
-Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.
-
-### 不自动重连,而是抛出TransactionCoordinatorNotFound异常
-
-Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2
-引入了这个问题 [a break change](https://github.com/apache/pulsar/pull/13135)。
-如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 
客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。
-
-您可以使用最新的`pulsar-client-all`分支来解决这个问题。
-
 {{< top >}}
 
 [schema-evolution]: 
https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution
diff --git a/docs/content/docs/connectors/datastream/pulsar.md 
b/docs/content/docs/connectors/datastream/pulsar.md
index 98b701e..5f991fa 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -1276,23 +1276,6 @@ If you have a problem with Pulsar when using Flink, keep 
in mind that Flink only
 and your problem might be independent of Flink and sometimes can be solved by 
upgrading Pulsar brokers,
 reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
 
-## Known Issues
-
-This section describes some known issues about the Pulsar connectors.
-
-### Unstable on Java 11
-
-Pulsar connector has some known issues on Java 11. It is recommended to run 
Pulsar connector
-on Java 8.
-
-### No TransactionCoordinatorNotFound, but automatic reconnect
-
-Pulsar transactions are still in active development and are not stable. Pulsar 
2.9.2
-introduces [a break change](https://github.com/apache/pulsar/pull/13135) in 
transactions.
-If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a 
`TransactionCoordinatorNotFound` exception.
-
-You can use the latest `pulsar-client-all` release to resolve this issue.
-
 {{< top >}}
 
 [schema-evolution]: 
https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution
diff --git a/flink-connector-pulsar-e2e-tests/pom.xml 
b/flink-connector-pulsar-e2e-tests/pom.xml
index c88e268..b730be7 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -75,7 +75,6 @@ under the License.



**/*.*


-   
${excludeE2E}



${project.basedir}
   

[flink] branch master updated: [FLINK-32428][table] Introduce base interfaces for CatalogStore

2023-07-20 Thread leonard
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 288a4982c34 [FLINK-32428][table] Introduce base interfaces for 
CatalogStore
288a4982c34 is described below

commit 288a4982c3473fcc08be52a6641d77c4ed2cdb5b
Author: Feng Jin 
AuthorDate: Thu Jul 20 18:11:36 2023 +0800

[FLINK-32428][table] Introduce base interfaces for CatalogStore

This closes #22937.
---
 .../table/catalog/GenericInMemoryCatalogStore.java |  79 +++
 .../GenericInMemoryCatalogStoreFactory.java|  54 
 .../GenericInMemoryCatalogStoreFactoryOptions.java |  28 
 .../org.apache.flink.table.factories.Factory   |   1 +
 .../GenericInMemoryCatalogStoreFactoryTest.java|  52 +++
 .../catalog/GenericInMemoryCatalogStoreTest.java   |  71 ++
 .../flink/table/catalog/AbstractCatalogStore.java  |  49 +++
 .../flink/table/catalog/CatalogDescriptor.java |  65 +
 .../apache/flink/table/catalog/CatalogStore.java   |  95 +
 .../flink/table/factories/CatalogStoreFactory.java | 152 +
 .../apache/flink/table/factories/FactoryUtil.java  |  35 +
 .../flink/table/factories/FactoryUtilTest.java |  16 +++
 .../table/factories/TestCatalogStoreFactory.java   | 107 +++
 .../org.apache.flink.table.factories.Factory   |   1 +
 14 files changed, 805 insertions(+)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java
new file mode 100644
index 000..db5bad70804
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStore.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.lang.String.format;
+
+/** A generic catalog store implementation that store all catalog 
configuration in memory. */
+public class GenericInMemoryCatalogStore extends AbstractCatalogStore {
+
+private final Map descriptors;
+
+public GenericInMemoryCatalogStore() {
+descriptors = new HashMap<>();
+}
+
+@Override
+public void storeCatalog(String catalogName, CatalogDescriptor catalog)
+throws CatalogException {
+checkOpenState();
+if (descriptors.containsKey(catalogName)) {
+throw new CatalogException(
+format("Catalog %s already exists in the catalog store.", 
catalogName));
+}
+descriptors.put(catalogName, catalog);
+}
+
+@Override
+public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
+throws CatalogException {
+checkOpenState();
+if (descriptors.containsKey(catalogName)) {
+descriptors.remove(catalogName);
+} else if (!ignoreIfNotExists) {
+throw new CatalogException(
+format("Catalog %s does not exist in the catalog store.", 
catalogName));
+}
+}
+
+@Override
+public Optional getCatalog(String catalogName) {
+checkOpenState();
+return Optional.ofNullable(descriptors.get(catalogName));
+}
+
+@Override
+public Set listCatalogs() {
+checkOpenState();
+return descriptors.keySet();
+}
+
+@Override
+public boolean contains(String catalogName) {
+checkOpenState();
+return descriptors.containsKey(catalogName);
+}
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogStoreFactory.java
new file mode 100644
index 

[flink] branch master updated: [FLINK-32169][ui] Show slot allocations on TM page

2023-07-20 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new acd34941e34 [FLINK-32169][ui] Show slot allocations on TM page
acd34941e34 is described below

commit acd34941e349cdb513fc41669a298cc1c33cc3ec
Author: Chesnay Schepler 
AuthorDate: Tue May 23 16:26:05 2023 +0200

[FLINK-32169][ui] Show slot allocations on TM page
---
 .../src/app/interfaces/task-manager.ts |  6 
 .../metrics/task-manager-metrics.component.html| 36 +-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
index 9c574bb71fc..c0865d2d0a6 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts
@@ -33,6 +33,12 @@ export interface TaskManagerDetail {
   blocked?: boolean;
   freeResource: Resources;
   totalResource: Resources;
+  allocatedSlots: AllocatedSlot[];
+}
+
+export interface AllocatedSlot {
+  jobId: string;
+  resource: Resources;
 }
 
 export interface Resources {
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
index 99a36df3f95..b4a41b521f2 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/metrics/task-manager-metrics.component.html
@@ -318,7 +318,7 @@
   
 
 
-
+
   
 
   
+  
+
+  
+#
+Job ID
+CPU (cores)
+Task Heap memory (MB)
+Task Off-Heap memory (MB)
+Managed memory (MB)
+Network memory (MB)
+  
+
+
+  
+
+  {{ i | number }}
+
+{{ slot.jobId }}
+{{ slot.resource.cpuCores | number }}
+{{ slot.resource.taskHeapMemory | number }}
+{{ slot.resource.taskOffHeapMemory | number }}
+{{ slot.resource.managedMemory | number }}
+{{ slot.resource.networkMemory | number }}
+  
+
+  
 



[flink] branch master updated (fd96076b1b4 -> 86207724d5b)

2023-07-20 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from fd96076b1b4 [FLINK-32404][table] Add catalog modification listener 
interface and create listener for catalog manager (#22924)
 add 86207724d5b [FLINK-19010][metric] Introduce subtask level restore 
metric

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/ops/metrics.md|  5 ++
 docs/content/docs/ops/metrics.md   |  7 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |  1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 91 ++
 .../metrics/groups/TaskIOMetricGroupTest.java  | 63 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  |  1 +
 6 files changed, 131 insertions(+), 37 deletions(-)



[flink] branch master updated (d8e77674a88 -> fd96076b1b4)

2023-07-20 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from d8e77674a88 [FLINK-32403][table] Add database related operations in 
CatalogManager (#22869)
 add fd96076b1b4 [FLINK-32404][table] Add catalog modification listener 
interface and create listener for catalog manager (#22924)

No new revisions were added by this update.

Summary of changes:
 .../generated/table_config_configuration.html  | 14 +++--
 .../gateway/service/context/SessionContext.java|  6 +-
 .../service/context/SessionContextTest.java| 69 +
 .../java/internal/StreamTableEnvironmentImpl.java  |  4 ++
 .../flink/table/api/config/TableConfigOptions.java | 13 
 .../table/api/internal/TableEnvironmentImpl.java   |  3 +
 .../apache/flink/table/catalog/CatalogManager.java | 25 +++-
 .../table/catalog/listener/CatalogContext.java | 45 ++
 .../catalog/listener/CatalogModificationEvent.java | 31 ++
 .../listener/CatalogModificationListener.java  | 34 ++
 .../CatalogModificationListenerFactory.java| 57 +
 .../flink/table/factories/TableFactoryUtil.java| 32 ++
 .../table/catalog/listener/CatalogFactory1.java| 34 ++
 .../table/catalog/listener/CatalogFactory2.java| 34 ++
 .../table/catalog/listener/CatalogListener1.java   | 28 +
 .../table/catalog/listener/CatalogListener2.java   | 28 +
 .../listener/CatalogListenerFactoryTest.java   | 59 ++
 .../org.apache.flink.table.factories.Factory   | 17 +
 .../internal/StreamTableEnvironmentImpl.scala  |  6 +-
 .../apache/flink/table/api/EnvironmentTest.java| 72 --
 .../runtime/catalog/CatalogListenerTest.scala  | 61 ++
 21 files changed, 659 insertions(+), 13 deletions(-)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogContext.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationEvent.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationListener.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationListenerFactory.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogFactory1.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogFactory2.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListener1.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListener2.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/listener/CatalogListenerFactoryTest.java
 create mode 100644 
flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 create mode 100644 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/catalog/CatalogListenerTest.scala



[flink] branch master updated: [FLINK-32403][table] Add database related operations in CatalogManager (#22869)

2023-07-20 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d8e77674a88 [FLINK-32403][table] Add database related operations in 
CatalogManager (#22869)
d8e77674a88 is described below

commit d8e77674a885feba22dd079656e4b39f33fa5da1
Author: Shammon FY 
AuthorDate: Thu Jul 20 14:46:43 2023 +0800

[FLINK-32403][table] Add database related operations in CatalogManager 
(#22869)
---
 .../apache/flink/table/catalog/CatalogManager.java | 67 ++
 .../operations/ddl/AlterDatabaseOperation.java |  6 +-
 .../operations/ddl/CreateDatabaseOperation.java|  5 +-
 .../operations/ddl/DropDatabaseOperation.java  |  5 +-
 .../operations/SqlDdlToOperationConverterTest.java | 10 ++--
 5 files changed, 78 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 2ec64890e93..4be17444b9a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
@@ -1066,4 +1068,69 @@ public final class CatalogManager implements 
CatalogRegistry {
 final ResolvedSchema resolvedSchema = 
view.getUnresolvedSchema().resolve(schemaResolver);
 return new ResolvedCatalogView(view, resolvedSchema);
 }
+
+/**
+ * Create a database.
+ *
+ * @param catalogName Name of the catalog for database
+ * @param databaseName Name of the database to be created
+ * @param database The database definition
+ * @param ignoreIfExists Flag to specify behavior when a database with the 
given name already
+ * exists: if set to false, throw a DatabaseAlreadyExistException, if 
set to true, do
+ * nothing.
+ * @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists
+ * is false
+ * @throws CatalogException in case of any runtime exception
+ */
+public void createDatabase(
+String catalogName,
+String databaseName,
+CatalogDatabase database,
+boolean ignoreIfExists)
+throws DatabaseAlreadyExistException, CatalogException {
+Catalog catalog = getCatalogOrThrowException(catalogName);
+catalog.createDatabase(databaseName, database, ignoreIfExists);
+}
+
+/**
+ * Drop a database.
+ *
+ * @param catalogName Name of the catalog for database.
+ * @param databaseName Name of the database to be dropped.
+ * @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @param cascade Flag to specify behavior when the database contains 
table or function: if set
+ * to true, delete all tables and functions in the database and then 
delete the database, if
+ * set to false, throw an exception.
+ * @throws DatabaseNotExistException if the given database does not exist
+ * @throws DatabaseNotEmptyException if the given database is not empty 
and isRestrict is true
+ * @throws CatalogException in case of any runtime exception
+ */
+public void dropDatabase(
+String catalogName, String databaseName, boolean 
ignoreIfNotExists, boolean cascade)
+throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+Catalog catalog = getCatalogOrError(catalogName);
+catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade);
+}
+
+/**
+ * Modify an existing database.
+ *
+ * @param catalogName Name of the catalog for database
+ * @param databaseName Name of the database to be dropped
+ * @param newDatabase The new database definition
+ * @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist: if
+ * set to false, throw an exception, if set to true, do nothing.
+ * @throws