(flink) branch master updated: [FLINK-16627][json] Support ignore null fields when serializing into JSON

2024-03-05 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 93555a7d55d [FLINK-16627][json] Support ignore null fields when 
serializing into JSON
93555a7d55d is described below

commit 93555a7d55dc27571122cccb7c4d8af2c5db54cb
Author: zhouyisha 
AuthorDate: Fri Mar 1 17:11:32 2024 +0800

[FLINK-16627][json] Support ignore null fields when serializing into JSON

Close apache/flink#24430
---
 .../docs/connectors/table/formats/debezium.md  |  7 ++
 .../docs/connectors/table/formats/json.md  |  7 ++
 .../docs/connectors/table/formats/maxwell.md   |  7 ++
 .../docs/connectors/table/formats/ogg.md   |  9 ++-
 .../docs/connectors/table/formats/debezium.md  |  7 ++
 docs/content/docs/connectors/table/formats/json.md |  8 +++
 .../docs/connectors/table/formats/maxwell.md   |  7 ++
 docs/content/docs/connectors/table/formats/ogg.md  |  7 ++
 .../hive/util/ThriftObjectConversions.java |  2 +-
 .../flink/formats/json/JsonFormatFactory.java  |  7 +-
 .../flink/formats/json/JsonFormatOptions.java  |  7 ++
 .../json/JsonRowDataSerializationSchema.java   | 21 --
 .../formats/json/RowDataToJsonConverters.java  | 15 ++--
 .../formats/json/canal/CanalJsonFormatFactory.java |  7 +-
 .../json/canal/CanalJsonSerializationSchema.java   |  6 +-
 .../json/debezium/DebeziumJsonFormatFactory.java   |  6 +-
 .../debezium/DebeziumJsonSerializationSchema.java  |  6 +-
 .../json/maxwell/MaxwellJsonFormatFactory.java |  7 +-
 .../maxwell/MaxwellJsonSerializationSchema.java|  6 +-
 .../formats/json/ogg/OggJsonFormatFactory.java |  7 +-
 .../json/ogg/OggJsonSerializationSchema.java   |  6 +-
 .../flink/formats/json/JsonFormatFactoryTest.java  |  2 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 84 +++---
 .../json/canal/CanalJsonFormatFactoryTest.java |  3 +
 .../json/canal/CanalJsonSerDeSchemaTest.java   |  3 +-
 .../debezium/DebeziumJsonFormatFactoryTest.java|  2 +
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java |  3 +-
 .../json/maxwell/MaxwellJsonFormatFactoryTest.java |  2 +
 .../json/maxwell/MaxwellJsonSerDerTest.java|  3 +-
 .../formats/json/ogg/OggJsonFormatFactoryTest.java |  2 +
 .../formats/json/ogg/OggJsonSerDeSchemaTest.java   |  3 +-
 .../gateway/rest/serde/ResultInfoSerializer.java   |  5 +-
 32 files changed, 236 insertions(+), 38 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md 
b/docs/content.zh/docs/connectors/table/formats/debezium.md
index a6ac486f0f0..7ba62dd408d 100644
--- a/docs/content.zh/docs/connectors/table/formats/debezium.md
+++ b/docs/content.zh/docs/connectors/table/formats/debezium.md
@@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 
format 来
   Boolean
   将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 
2.7E-8。当此选项设为 true 时,则会表示为 0.00027。
 
+
+  debezium-json.encode.ignore-null-fields
+  选填
+  false
+  Boolean
+  仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。
+
 
 
 
diff --git a/docs/content.zh/docs/connectors/table/formats/json.md 
b/docs/content.zh/docs/connectors/table/formats/json.md
index f1acdd7a001..005485a7a0a 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -135,6 +135,13 @@ Format 参数
   Boolean
   将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 
2.7E-8。当此选项设为 true 时,则会表示为 0.00027。
 
+
+  json.encode.ignore-null-fields
+  选填
+  false
+  Boolean
+  仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。
+
 
   decode.json-parser.enabled
   选填
diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md 
b/docs/content.zh/docs/connectors/table/formats/maxwell.md
index a3ac161f231..0bdedeac682 100644
--- a/docs/content.zh/docs/connectors/table/formats/maxwell.md
+++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
   Boolean
   Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, 0.00027 is encoded as 2.7E-8 by default, 
and will be written as 0.00027 if set this option to true.
 
+
+  maxwell-json.encode.ignore-null-fields
+  optional
+  false
+  Boolean
+  Encode only non-null fields. By default, all fields will be 
included.
+
 
 
 
diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md 
b/docs/content.zh/docs/connectors/table/formats/ogg.md
index c8e8a7a6c6d..61ec97b60fd 100644
--- a/docs/content.zh/docs/connectors/table/formats/ogg.md
+++ b/docs/content.zh/docs/connectors/table/formats/ogg.md
@@ -216,7 +216,7 @@ Format

(flink) branch master updated: [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic.

2024-03-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e0b6c121eaf [FLINK-33263][bugfix][table-planner] Remove redundant 
transformation verification logic.
e0b6c121eaf is described below

commit e0b6c121eaf7aeb2974a45d199e452b022f07d29
Author: SuDewei 
AuthorDate: Mon Mar 4 17:42:38 2024 +0800

[FLINK-33263][bugfix][table-planner] Remove redundant transformation 
verification logic.
---
 .../org/apache/flink/api/dag/Transformation.java   | 15 
 .../table/planner/delegation/BatchPlanner.scala|  2 +-
 .../table/planner/delegation/PlannerBase.scala |  3 +-
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../planner/plan/stream/sql/TableScanTest.xml  | 95 --
 .../planner/plan/stream/sql/TableScanTest.scala| 12 ++-
 .../flink/table/planner/utils/TableTestBase.scala  | 51 +---
 7 files changed, 81 insertions(+), 99 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index 6256f9624f6..a0448697dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.dag;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -603,20 +602,6 @@ public abstract class Transformation {
 + '}';
 }
 
-@VisibleForTesting
-public String toStringWithoutId() {
-return getClass().getSimpleName()
-+ "{"
-+ "name='"
-+ name
-+ '\''
-+ ", outputType="
-+ outputType
-+ ", parallelism="
-+ parallelism
-+ '}';
-}
-
 @Override
 public boolean equals(Object o) {
 if (this == o) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index cea10f7bb81..bb4c1b75a28 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -84,7 +84,7 @@ class BatchPlanner(
 processors
   }
 
-  override def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+  override protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
 beforeTranslation()
 val planner = createDummyPlanner()
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index b36edaa21d7..45788e6278e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -367,8 +367,7 @@ abstract class PlannerBase(
* @return
*   The [[Transformation]] DAG that corresponds to the node DAG.
*/
-  @VisibleForTesting
-  def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
+  protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]]
 
   def addExtraTransformation(transformation: Transformation[_]): Unit = {
 if (!extraTransformations.contains(transformation)) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 894a37c8cf9..fb32326f117 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -78,7 +78,7 @@ class StreamPlanner(
 
   override protected def getExecNodeGraphProcessors: 
Seq[ExecNodeGraphProcessor] = Seq()
 
-  override def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+  override protected def translateToPlan(execGraph: ExecNodeGra

(flink) branch master updated: [FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive types on Proto3

2024-02-25 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bf75870c474 [FLINK-33817][protobuf] Allow ReadDefaultValues = False 
for non primitive types on Proto3
bf75870c474 is described below

commit bf75870c4744c884860ee72bf464a301d18fb477
Author: dsaisharath 
AuthorDate: Fri Jan 5 16:03:52 2024 -0800

[FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive 
types on Proto3

Close apache/flink#24035
---
 .../docs/connectors/table/formats/protobuf.md   |  7 +--
 .../flink/formats/protobuf/PbFormatContext.java |  9 -
 .../flink/formats/protobuf/PbFormatOptions.java |  5 -
 .../deserialize/PbCodegenRowDeserializer.java   | 14 ++
 .../protobuf/deserialize/ProtoToRowConverter.java   | 15 ++-
 .../protobuf/serialize/RowToProtoConverter.java |  4 ++--
 .../apache/flink/formats/protobuf/Pb3ToRowTest.java | 21 +++--
 7 files changed, 42 insertions(+), 33 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/protobuf.md 
b/docs/content/docs/connectors/table/formats/protobuf.md
index b28cf49f9d3..72d4aae3c3d 100644
--- a/docs/content/docs/connectors/table/formats/protobuf.md
+++ b/docs/content/docs/connectors/table/formats/protobuf.md
@@ -149,9 +149,12 @@ Format Options
   false
   Boolean
   
-  This option only works if the generated class's version is proto2. 
If this value is set to true, the format will read empty values as the default 
values defined in the proto file.
+  If this value is set to true, the format will read empty values as 
the default values defined in the proto file.
   If the value is set to false, the format will generate null values 
if the data element does not exist in the binary protobuf message.
-  If the proto syntax is proto3, this value will forcibly be set to 
true, because proto3's standard is to use default values.
+  If proto syntax is proto3, users need to set this to true when using 
protobuf versions lower than 3.15 as older versions do not support 
+  checking for field presence which can cause runtime compilation 
issues. Additionally, primtive types will be set to default values 
+  instead of null as field presence cannot be checked for them. Please 
be aware that setting this to true will cause the deserialization 
+  performance to be much slower depending on schema complexity and 
message size.
   
 
 
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 9302cc274c3..fc2090e24de 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -28,9 +28,12 @@ import java.util.List;
 public class PbFormatContext {
 private final PbFormatConfig pbFormatConfig;
 private final List splitMethodStack = new ArrayList<>();
+private final boolean readDefaultValuesForPrimitiveTypes;
 
-public PbFormatContext(PbFormatConfig pbFormatConfig) {
+public PbFormatContext(
+PbFormatConfig pbFormatConfig, boolean 
readDefaultValuesForPrimitiveTypes) {
 this.pbFormatConfig = pbFormatConfig;
+this.readDefaultValuesForPrimitiveTypes = 
readDefaultValuesForPrimitiveTypes;
 }
 
 private String createSplitMethod(
@@ -73,4 +76,8 @@ public class PbFormatContext {
 public PbFormatConfig getPbFormatConfig() {
 return pbFormatConfig;
 }
+
+public boolean getReadDefaultValuesForPrimitiveTypes() {
+return readDefaultValuesForPrimitiveTypes;
+}
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
index 1cf884f8314..18b8fc9618f 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java
@@ -44,7 +44,10 @@ public class PbFormatOptions {
 .defaultValue(false)
 .withDescription(
 "Optional flag to read as default values instead 
of null when some field does not exist in deserialization; default to false."
-+ "If proto syntax is proto3, this value 
will be set true forcibly because proto3's standard is to use default values.");
+

(flink) branch master updated: [FLINK-33611][flink-protobuf] Split last segment only when size exceeds split threshold limit in deserializer

2024-02-06 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new df03ada10e2 [FLINK-33611][flink-protobuf] Split last segment only when 
size exceeds split threshold limit in deserializer
df03ada10e2 is described below

commit df03ada10e226053780cb2e5e9742add4536289c
Author: dsaisharath 
AuthorDate: Wed Dec 13 14:14:38 2023 -0800

[FLINK-33611][flink-protobuf] Split last segment only when size exceeds 
split threshold limit in deserializer

Close apache/flink#23937
---
 .../deserialize/PbCodegenRowDeserializer.java  |  5 +-
 .../protobuf/serialize/PbCodegenRowSerializer.java |  8 +-
 .../formats/protobuf/VeryBigPbProtoToRowTest.java  | 39 +
 .../formats/protobuf/VeryBigPbRowToProtoTest.java  | 39 +
 .../src/test/proto/test_very_big_pb.proto  | 98 ++
 5 files changed, 178 insertions(+), 11 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index f3cf414f540..541b1f83839 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -109,10 +109,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
 index += 1;
 }
 if (!splitAppender.code().isEmpty()) {
-String splitMethod =
-formatContext.splitDeserializerRowTypeMethod(
-flinkRowDataVar, pbMessageTypeStr, pbMessageVar, 
splitAppender.code());
-appender.appendSegment(splitMethod);
+appender.appendSegment(splitAppender.code());
 }
 appender.appendLine(resultVar + " = " + flinkRowDataVar);
 return appender.code();
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 342117cd272..6f72ab83b81 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -130,13 +130,7 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
 index += 1;
 }
 if (!splitAppender.code().isEmpty()) {
-String splitMethod =
-formatContext.splitSerializerRowTypeMethod(
-flinkRowDataVar,
-pbMessageTypeStr + ".Builder",
-messageBuilderVar,
-splitAppender.code());
-appender.appendSegment(splitMethod);
+appender.appendSegment(splitAppender.code());
 }
 appender.appendLine(resultVar + " = " + messageBuilderVar + 
".build()");
 return appender.code();
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
new file mode 100644
index 000..a3e09ff34c8
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
+
+import org.junit.Test;
+
+/**
+ * Test for very huge proto definition, which may trigger some special 
optimiza

(flink) branch master updated (8e11b6005c9 -> 98e1d079f6e)

2024-01-31 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 8e11b6005c9 [FLINK-34272][runtime] ExecutingState should never 
transition during its construction.
 add 3c8088b716a [hotfix][table-common] Mark SinkV2Provider as the 
recommended core interface for SinkRuntimeProvider
 add 98e1d079f6e [FLINK-34270][docs] Update doc for the new Table source & 
sink interfaces introduced in FLIP-146/367

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/table/sourcesSinks.md | 12 ++--
 docs/content/docs/dev/table/sourcesSinks.md| 22 ++
 .../table/connector/sink/DynamicTableSink.java | 16 
 3 files changed, 36 insertions(+), 14 deletions(-)



(flink) branch master updated: [FLINK-33264][table] Support source parallelism setting for DataGen connector

2024-01-26 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 862a7129d27 [FLINK-33264][table] Support source parallelism setting 
for DataGen connector
862a7129d27 is described below

commit 862a7129d2730b4c70a21826a5b858fc541a4470
Author: Zhanghao Chen 
AuthorDate: Thu Jan 18 21:26:42 2024 +0800

[FLINK-33264][table] Support source parallelism setting for DataGen 
connector

Close apache/flink#24133
---
 docs/content.zh/docs/connectors/table/datagen.md   |  7 
 docs/content/docs/connectors/table/datagen.md  |  7 
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e   |  4 +-
 .../src/test/resources/sql/table.q |  1 +
 .../src/test/resources/sql/table.q |  1 +
 .../datagen/table/DataGenConnectorOptions.java |  3 ++
 .../datagen/table/DataGenTableSource.java  | 11 --
 .../datagen/table/DataGenTableSourceFactory.java   |  5 ++-
 .../factories/DataGenTableSourceFactoryTest.java   | 46 ++
 .../stream/table/DataGeneratorConnectorITCase.java | 25 
 10 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md 
b/docs/content.zh/docs/connectors/table/datagen.md
index 642382a7768..b90183fcc8c 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -266,6 +266,13 @@ CREATE TABLE Orders (
   Long
   生成数据的总行数。默认情况下,该表是无界的。
 
+
+  scan.parallelism
+  可选
+  (none)
+  Integer
+  定义算子并行度。不设置将使用全局默认并发。
+
 
   fields.#.kind
   可选
diff --git a/docs/content/docs/connectors/table/datagen.md 
b/docs/content/docs/connectors/table/datagen.md
index 70253786bff..1f105076897 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -270,6 +270,13 @@ Connector Options
   Long
   The total number of rows to emit. By default, the table is 
unbounded.
 
+
+  scan.parallelism
+  optional
+  (none)
+  Integer
+  Defines the parallelism of the source. If not set, the global 
default parallelism is used.
+
 
   fields.#.kind
   optional
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 4e44bbf3505..0fd86b29430 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -76,8 +76,8 @@ Constructor 
(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
 in 
(GeneratorSourceReaderFactory.java:54)
 Constructor 
(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
 in 
(GeneratorSourceReaderFactory.java:55)
 Constructor 
(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has 
parameter of type 
 in 
(GeneratorSourceReaderFactory.java:0)
-Constructor 
([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> depends on component type 
 in 
(DataGenTableSource.java:0)
-Constructor 
([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
+Constructor 
([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, 
java.lang.Integer)> depends on component type 
 in 
(DataGenTableSource.java:0)
+Constructor 
([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, 
java.lang.Integer)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
 Constructor 
(java.lang.String,
 org.apache.flink.configuration.ReadableConfig)> calls constructor 
()> 
in (DataGenVisitorBase.java:49)
 Constructor 
(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 jav

(flink) branch master updated: [FLINK-33263][table-planner] Implement ParallelismProvider for sources in the table planner

2024-01-25 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 42b7e74ab20 [FLINK-33263][table-planner] Implement ParallelismProvider 
for sources in the table planner
42b7e74ab20 is described below

commit 42b7e74ab20785289b62f5dd68d566995ba9dcfc
Author: SuDewei 
AuthorDate: Thu Jan 18 16:05:40 2024 +0800

[FLINK-33263][table-planner] Implement ParallelismProvider for sources in 
the table planner

Close apache/flink#24128
---
 .../org/apache/flink/api/dag/Transformation.java   |  15 ++
 .../streaming/api/graph/StreamGraphGenerator.java  |   3 +
 .../SourceTransformationWrapper.java   |  72 ++
 .../exec/common/CommonExecTableSourceScan.java | 154 ++---
 .../table/planner/delegation/BatchPlanner.scala|   2 +-
 .../table/planner/delegation/PlannerBase.scala |   3 +-
 .../table/planner/delegation/StreamPlanner.scala   |   2 +-
 .../planner/factories/TestValuesTableFactory.java  |  33 +++--
 .../planner/plan/stream/sql/TableScanTest.xml  |  42 ++
 .../planner/plan/stream/sql/TableScanTest.scala|  38 +
 .../runtime/stream/sql/TableSourceITCase.scala |  80 +++
 .../flink/table/planner/utils/TableTestBase.scala  |  51 ++-
 12 files changed, 463 insertions(+), 32 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index a0448697dd1..6256f9624f6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.dag;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -602,6 +603,20 @@ public abstract class Transformation {
 + '}';
 }
 
+@VisibleForTesting
+public String toStringWithoutId() {
+return getClass().getSimpleName()
++ "{"
++ "name='"
++ name
++ '\''
++ ", outputType="
++ outputType
++ ", parallelism="
++ parallelism
++ '}';
+}
+
 @Override
 public boolean equals(Object o) {
 if (this == o) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 5929a2a5e8e..8e267ff84d6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -65,6 +65,7 @@ import 
org.apache.flink.streaming.api.transformations.ReduceTransformation;
 import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import 
org.apache.flink.streaming.api.transformations.SourceTransformationWrapper;
 import 
org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
@@ -553,6 +554,8 @@ public class StreamGraphGenerator {
 transformedIds = transformFeedback((FeedbackTransformation) 
transform);
 } else if (transform instanceof CoFeedbackTransformation) {
 transformedIds = transformCoFeedback((CoFeedbackTransformation) 
transform);
+} else if (transform instanceof SourceTransformationWrapper) {
+transformedIds = transform(((SourceTransformationWrapper) 
transform).getInput());
 } else {
 throw new IllegalStateException("Unknown transformation: " + 
transform);
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java
new file mode 100644
index 000..d536000fde2
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Softwar

(flink) branch master updated: [hotfix][docs] Add protobuf format info to overview page

2024-01-09 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2003b70aceb [hotfix][docs] Add protobuf format info to overview page
2003b70aceb is described below

commit 2003b70aceb2cdddfbd3d05263e1dbd699bf6585
Author: sharath1709 
AuthorDate: Tue Jan 9 15:14:16 2024 -0800

[hotfix][docs] Add protobuf format info to overview page
---
 docs/content/docs/connectors/table/formats/overview.md | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/table/formats/overview.md 
b/docs/content/docs/connectors/table/formats/overview.md
index 987ba8d8820..4ae1c6987ba 100644
--- a/docs/content/docs/connectors/table/formats/overview.md
+++ b/docs/content/docs/connectors/table/formats/overview.md
@@ -26,7 +26,7 @@ under the License.
 
 # Formats
 
-Flink provides a set of table formats that can be used with table connectors. 
A table format is a storage format defines how to map binary data onto table 
columns.
+Flink provides a set of table formats that can be used with table connectors. 
A table format is a storage format that defines how to map binary data onto 
table columns.
 
 Flink supports the following formats:
 
@@ -68,6 +68,10 @@ Flink supports the following formats:
   }}">Apache 
Kafka,
}}">Upsert 
Kafka
 
+
+  }}">Protobuf
+  }}">Apache 
Kafka
+
 
  }}">Debezium CDC
   }}">Apache 
Kafka,



(flink) branch release-1.17 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 58f8162613d [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
58f8162613d is described below

commit 58f8162613d9f615e60fb0c9e23692d25469d6f0
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 5bbdca9a054..b96422625a8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1397,7 +1397,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1411,7 +1411,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index cb2ff52c6e8..04cf1540bc8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -922,7 +922,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -940,7 +942,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGener

(flink) branch release-1.18 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new d6081815000 [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
d6081815000 is described below

commit d60818150005661006a71e4155fc605d7543362b
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e653d1d6463..82197822dc5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1576,7 +1576,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1590,7 +1590,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 9769613cd2d..1a98081dd79 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGener

(flink) branch master updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable

2023-12-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 45f966e8c3c [FLINK-33541][table-planner] RAND and RAND_INTEGER should 
return type nullable if the arguments are nullable
45f966e8c3c is described below

commit 45f966e8c3c5e903b3843391874f7d2478122d8c
Author: xuyang 
AuthorDate: Thu Nov 23 14:05:07 2023 +0800

[FLINK-33541][table-planner] RAND and RAND_INTEGER should return type 
nullable if the arguments are nullable

Close apache/flink#23779
---
 .../functions/BuiltInFunctionDefinitions.java  |  4 +-
 .../functions/sql/FlinkSqlOperatorTable.java   |  8 +-
 .../table/planner/codegen/calls/RandCallGen.scala  |  5 +-
 .../planner/functions/BuiltInFunctionTestBase.java | 89 ++--
 .../planner/functions/RandFunctionITCase.java  | 94 ++
 .../planner/expressions/ScalarFunctionsTest.scala  | 45 +++
 .../expressions/utils/ExpressionTestBase.scala | 14 +++-
 7 files changed, 226 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 89ac66d18f6..b65afdc4284 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1644,7 +1644,7 @@ public final class BuiltInFunctionDefinitions {
 .kind(SCALAR)
 .notDeterministic()
 .inputTypeStrategy(or(NO_ARGS, 
sequence(logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull()))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE(
 .build();
 
 public static final BuiltInFunctionDefinition RAND_INTEGER =
@@ -1658,7 +1658,7 @@ public final class BuiltInFunctionDefinitions {
 sequence(
 logical(LogicalTypeRoot.INTEGER),
 logical(LogicalTypeRoot.INTEGER
-.outputTypeStrategy(explicit(INT().notNull()))
+.outputTypeStrategy(nullableIfArgs(explicit(INT(
 .build();
 
 public static final BuiltInFunctionDefinition BIN =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 9769613cd2d..1a98081dd79 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.DOUBLE,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.DOUBLE),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
@@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
 new SqlFunction(
 "RAND_INTEGER",
 SqlKind.OTHER_FUNCTION,
-ReturnTypes.INTEGER,
+ReturnTypes.cascade(
+ReturnTypes.explicit(SqlTypeName.INTEGER),
+SqlTypeTransforms.TO_NULLABLE),
 null,
 OperandTypes.or(
 new SqlSingleOperandTypeChecker[] {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
index 1a7b4950586..e322e1854ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala
@@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) 
extends CallGener

(flink) branch release-1.17 updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal

2023-12-09 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new cfc7881ab5f [FLINK-33313][table] Fix 
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle 
binary literal
cfc7881ab5f is described below

commit cfc7881ab5f1c27d7a9a7781b255fa94989201b9
Author: zoudan 
AuthorDate: Thu Oct 19 17:48:05 2023 +0800

[FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions 
throws an Exception when handle binary literal

Close apache/flink#23551
---
 .../table/planner/plan/utils/RexNodeExtractor.scala |  4 
 .../plan/utils/NestedProjectionUtilTest.scala   | 10 ++
 .../planner/plan/utils/RexNodeExtractorTest.scala   | 21 +
 .../planner/plan/utils/RexNodeRewriterTest.scala|  7 ---
 .../table/planner/plan/utils/RexNodeTestBase.scala  |  6 --
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index a7cbb4a9ffc..4b57796e792 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.YearMonthIntervalType
 import org.apache.flink.util.Preconditions
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator}
@@ -489,6 +490,9 @@ class RexNodeToExpressionConverter(
 // convert to BigDecimal
 literal.getValueAs(classOf[java.math.BigDecimal])
 
+  case BINARY | VARBINARY =>
+literal.getValueAs(classOf[Array[Byte]])
+
   case _ =>
 literal.getValue
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   "$2",
   "$3",
   "$4",
+  "$5",
   "*($t2, $t3)",
   "100",
-  "<($t5, $t6)",
+  "<($t6, $t7)",
   "6",
-  ">($t1, $t8)",
-  "AND($t7, $t9)")))
+  ">($t1, $t9)",
+  "AND($t8, $t10)")))
 
 val nestedField = NestedProjectionUtil.build(exprs, 
rexProgram.getInputRowType)
 val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   Array(1),
   Array(2),
   Array(3),
-  Array(4)
+  Array(4),
+  Array(5)
 )
 assertArray(paths, orderedPaths)
 val builder = new FlinkRexBuilder(typeFactory)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
 import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 assertEquals(0, unconvertedRexNodes.length)
   }
 
+  @Test
+  def testExtractConditionWithBinaryLiteral(): Unit = {
+// blob
+val t0 = rexBuilder.makeInputRef(allF

(flink) branch release-1.18 updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal

2023-12-08 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 265d8174165 [FLINK-33313][table] Fix 
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle 
binary literal
265d8174165 is described below

commit 265d81741654ff70836485e8e8d31ce33a87e960
Author: zoudan 
AuthorDate: Thu Oct 19 17:48:05 2023 +0800

[FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions 
throws an Exception when handle binary literal

Close apache/flink#23551
---
 .../table/planner/plan/utils/RexNodeExtractor.scala |  4 
 .../plan/utils/NestedProjectionUtilTest.scala   | 10 ++
 .../planner/plan/utils/RexNodeExtractorTest.scala   | 21 +
 .../planner/plan/utils/RexNodeRewriterTest.scala|  7 ---
 .../table/planner/plan/utils/RexNodeTestBase.scala  |  6 --
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index a7cbb4a9ffc..4b57796e792 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.YearMonthIntervalType
 import org.apache.flink.util.Preconditions
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator}
@@ -489,6 +490,9 @@ class RexNodeToExpressionConverter(
 // convert to BigDecimal
 literal.getValueAs(classOf[java.math.BigDecimal])
 
+  case BINARY | VARBINARY =>
+literal.getValueAs(classOf[Array[Byte]])
+
   case _ =>
 literal.getValue
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   "$2",
   "$3",
   "$4",
+  "$5",
   "*($t2, $t3)",
   "100",
-  "<($t5, $t6)",
+  "<($t6, $t7)",
   "6",
-  ">($t1, $t8)",
-  "AND($t7, $t9)")))
+  ">($t1, $t9)",
+  "AND($t8, $t10)")))
 
 val nestedField = NestedProjectionUtil.build(exprs, 
rexProgram.getInputRowType)
 val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   Array(1),
   Array(2),
   Array(3),
-  Array(4)
+  Array(4),
+  Array(5)
 )
 assertArray(paths, orderedPaths)
 val builder = new FlinkRexBuilder(typeFactory)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
 import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 assertEquals(0, unconvertedRexNodes.length)
   }
 
+  @Test
+  def testExtractConditionWithBinaryLiteral(): Unit = {
+// blob
+val t0 = rexBuilder.makeInputRef(allF

(flink) branch master updated: [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway

2023-12-06 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b6380d54f11 [FLINK-33266][sql-gateway] Support plan cache for DQL in 
SQL Gateway
b6380d54f11 is described below

commit b6380d54f11ebb062397d775764f28f20fd35b69
Author: zoudan 
AuthorDate: Fri Dec 1 10:41:31 2023 +0800

[FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway

Close apache/flink#23849
---
 docs/content.zh/docs/dev/table/olap_quickstart.md  |   9 +-
 .../docs/dev/table/sql-gateway/overview.md |  18 ++
 docs/content/docs/dev/table/olap_quickstart.md |  11 +-
 .../content/docs/dev/table/sql-gateway/overview.md |  18 ++
 .../operators/collect/CollectResultIterator.java   |  20 ++
 .../api/config/SqlGatewayServiceConfigOptions.java |  24 ++
 .../gateway/service/context/SessionContext.java|  36 +++
 .../service/operation/OperationExecutor.java   |  49 +++-
 .../table/gateway/service/session/Session.java |   8 +
 .../service/SqlGatewayServiceStatementITCase.java  |  71 +-
 .../src/test/resources/sql/repeated_dql.q  | 261 +
 .../flink/table/api/internal/CachedPlan.java   |  29 +++
 .../flink/table/api/internal/DQLCachedPlan.java|  56 +
 .../table/api/internal/InsertResultProvider.java   |   6 +
 .../flink/table/api/internal/PlanCacheManager.java |  65 +
 .../flink/table/api/internal/ResultProvider.java   |   3 +
 .../table/api/internal/TableEnvironmentImpl.java   |  29 ++-
 .../api/internal/TableEnvironmentInternal.java |   8 +
 .../flink/table/api/internal/TableResultImpl.java  |  25 +-
 .../table/api/internal/TableResultInternal.java|   5 +
 .../planner/connectors/CollectDynamicSink.java |   7 +
 21 files changed, 730 insertions(+), 28 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/olap_quickstart.md 
b/docs/content.zh/docs/dev/table/olap_quickstart.md
index e109dfe71ee..32fae83b5bc 100644
--- a/docs/content.zh/docs/dev/table/olap_quickstart.md
+++ b/docs/content.zh/docs/dev/table/olap_quickstart.md
@@ -151,10 +151,11 @@ Session Cluster 和 SQL Gateway 都依赖连接器来获取表的元信息同时
 
  SQL&Table 参数
 
-| 参数名称 
  | 默认值   | 推荐值  |
-|:---|:--|:-|
-| [table.optimizer.join-reorder-enabled]({{}}) | false | true 
|
-| [pipeline.object-reuse]({{< ref 
"docs/deployment/config#pipeline-object-reuse" >}})
| false | true |
+| 参数名称 
 | 默认值   | 推荐值  |
+|:--|:--|:-|
+| [table.optimizer.join-reorder-enabled]({{}})
| false | true |
+| [pipeline.object-reuse]({{< ref 
"docs/deployment/config#pipeline-object-reuse" >}}) 
  | false | true |
+| [sql-gateway.session.plan-cache.enabled]({{}})
  | false | true |
 
  Runtime 参数
 
diff --git a/docs/content.zh/docs/dev/table/sql-gateway/overview.md 
b/docs/content.zh/docs/dev/table/sql-gateway/overview.md
index 81b1bea869f..4b91d6862ab 100644
--- a/docs/content.zh/docs/dev/table/sql-gateway/overview.md
+++ b/docs/content.zh/docs/dev/table/sql-gateway/overview.md
@@ -188,6 +188,24 @@ $ ./sql-gateway -Dkey=value
 Integer
 SQL Gateway 服务中存活 session 的最大数量。
 
+
+sql-gateway.session.plan-cache.enabled
+false
+Boolean
+设置为 true 的时候,SQL Gateway 会在一个 session 内部缓存并复用 plan。
+
+
+sql-gateway.session.plan-cache.size
+100
+Integer
+Plan cache 的大小, 当且仅当 `table.optimizer.plan-cache.enabled` 为 
true 的时候生效。
+
+
+sql-gateway.session.plan-cache.ttl
+1 hour
+Duration
+Plan cache 的 TTL, 控制 cache 在写入之后多久过期, 当且仅当 
`table.optimizer.plan-cache.enabled` 为 true 的时候生效。
+
 
 sql-gateway.worker.keepalive-time
 5 min
diff --git a/docs/content/docs/dev/table/olap_quickstart.md 
b/docs/content/docs/dev/table/olap_quickstart.md
index e5084e065c2..d630a80d313 100644
--- a/docs/content/docs/dev/table/olap_quickstart.md
+++ b/docs/content/docs/dev/table/olap_quickstart.md
@@ -151,10 +151,11 @@ In OLAP scenario, appropriate configurations that can 
greatly help users improve
 
  SQL&Table Options
 
-| Parameters   
 

(flink) branch master updated: [FLINK-27056][streaming] "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value

2023-11-14 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3856e41f517 [FLINK-27056][streaming] "pipeline.time-characteristic" 
should be deprecated and have EVENT_TIME as default value
3856e41f517 is described below

commit 3856e41f5172dee070a4b1d5f15829203b6d580f
Author: Zhanghao Chen 
AuthorDate: Sat Jul 9 18:06:55 2022 +0800

[FLINK-27056][streaming] "pipeline.time-characteristic" should be 
deprecated and have EVENT_TIME as default value

Close apache/flink#20231
---
 docs/content.zh/docs/deployment/config.md  |  1 -
 docs/content/docs/deployment/config.md |  1 -
 .../generated/stream_pipeline_configuration.html   | 18 --
 .../api/environment/StreamPipelineOptions.java | 17 -
 4 files changed, 16 insertions(+), 21 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index bc2e406a220..ff8935923b6 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -314,7 +314,6 @@ See the [History Server Docs]({{< ref 
"docs/deployment/advanced/historyserver" >
 ### Pipeline
 
 {{< generated/pipeline_configuration >}}
-{{< generated/stream_pipeline_configuration >}}
 
 ### Checkpointing
 
diff --git a/docs/content/docs/deployment/config.md 
b/docs/content/docs/deployment/config.md
index 6071d674afe..3de3826c040 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -316,7 +316,6 @@ See the [History Server Docs]({{< ref 
"docs/deployment/advanced/historyserver" >
 ### Pipeline
 
 {{< generated/pipeline_configuration >}}
-{{< generated/stream_pipeline_configuration >}}
 
 ### Checkpointing
 
diff --git 
a/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html 
b/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html
deleted file mode 100644
index 8ddd9fb10c6..000
--- a/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html
+++ /dev/null
@@ -1,18 +0,0 @@
-
-
-
-Key
-Default
-Type
-Description
-
-
-
-
-pipeline.time-characteristic
-ProcessingTime
-Enum
-The time characteristic for all created streams, e.g., 
processingtime, event time, or ingestion time.If you set the 
characteristic to IngestionTime or EventTime this will set a default watermark 
update interval of 200 ms. If this is not applicable for your application you 
should change it using pipeline.auto-watermark-interval.Possible 
values:"ProcessingTime""IngestionTime""EventTime" 
[...]
-
-
-
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java
index 01aa5f9172a..e848479f3b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.PipelineOptions;
@@ -32,10 +33,24 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
  */
 @PublicEvolving
 public class StreamPipelineOptions {
+
+/**
+ * @deprecated In Flink 1.12 the default stream time characteristic has 
been changed to {@link
+ * TimeCharacteristic#EventTime}, thus you don't need to set this 
option for enabling
+ * event-time support anymore. Explicitly using processing-time 
windows and timers works in
+ * event-time mode. If you need to disable watermarks, please set 
{@link
+ * PipelineOptions#AUTO_WATERMARK_INTERVAL} to 0. If you are using 
{@link
+ * TimeCharacteristic#IngestionTime}, please manually set an 
appropriate {@link
+ * WatermarkStrategy}. If you are using generic "time window" 
operations (for example {@link
+ * 
org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
+ * that change behaviour based on the time characteristic, please use 
equivalent operations
+ * that explicitly specify processing time or event time.
+ */
+

(flink) branch master updated: [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems

2023-11-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a2ec4c3b8bd [FLINK-32650][protobuf] Split generated code in protobuf 
format to mitigate huge method problems
a2ec4c3b8bd is described below

commit a2ec4c3b8bd9f2009fd721ab4c51afedcd8574fb
Author: lijingwei.5018 
AuthorDate: Mon Nov 13 11:32:43 2023 +0800

[FLINK-32650][protobuf] Split generated code in protobuf format to mitigate 
huge method problems

Close apache/flink#23162
---
 .../apache/flink/formats/protobuf/PbConstant.java  |   6 +
 .../flink/formats/protobuf/PbFormatContext.java|  44 ++
 .../deserialize/PbCodegenRowDeserializer.java  |  30 +++-
 .../PbRowDataDeserializationSchema.java|   6 +
 .../protobuf/deserialize/ProtoToRowConverter.java  |  13 ++
 .../protobuf/serialize/PbCodegenRowSerializer.java |  37 -
 .../serialize/PbRowDataSerializationSchema.java|   6 +
 .../protobuf/serialize/RowToProtoConverter.java|  13 ++
 .../formats/protobuf/util/PbCodegenUtils.java  |   4 +
 .../formats/protobuf/BigPbProtoToRowTest.java  | 140 +
 .../formats/protobuf/BigPbRowToProtoTest.java  | 166 +
 .../src/test/proto/test_big_pb.proto   |  61 
 12 files changed, 512 insertions(+), 14 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
index ea7d6514c56..4a39794e68d 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java
@@ -27,4 +27,10 @@ public class PbConstant {
 public static final String PB_MAP_KEY_NAME = "key";
 public static final String PB_MAP_VALUE_NAME = "value";
 public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass";
+/**
+ * JIT optimizer threshold is 8K, unicode encode one char use 1byte, so 
use 4K as
+ * codegen_spilt_threshold,A conservative threshold is selected to prevent 
multiple element code
+ * segments in RowType from being combined to exceed 8K.
+ */
+public static final int CODEGEN_SPLIT_THRESHOLD = 4000;
 }
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
index 27ceb0fb49d..9302cc274c3 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java
@@ -18,14 +18,58 @@
 
 package org.apache.flink.formats.protobuf;
 
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenVarId;
+
+import java.util.ArrayList;
+import java.util.List;
+
 /** store config and common information. */
 public class PbFormatContext {
 private final PbFormatConfig pbFormatConfig;
+private final List splitMethodStack = new ArrayList<>();
 
 public PbFormatContext(PbFormatConfig pbFormatConfig) {
 this.pbFormatConfig = pbFormatConfig;
 }
 
+private String createSplitMethod(
+String rowDataType,
+String rowDataVar,
+String messageTypeStr,
+String messageTypeVar,
+String code) {
+int uid = PbCodegenVarId.getInstance().getAndIncrement();
+String splitMethodName = "split" + uid;
+PbCodegenAppender pbCodegenAppender = new PbCodegenAppender();
+pbCodegenAppender.appendSegment(
+String.format(
+"private static void %s (%s %s, %s %s) {\n %s \n}",
+splitMethodName,
+rowDataType,
+rowDataVar,
+messageTypeStr,
+messageTypeVar,
+code));
+splitMethodStack.add(pbCodegenAppender.code());
+return String.format("%s(%s, %s);", splitMethodName, rowDataVar, 
messageTypeVar);
+}
+
+public String splitDeserializerRowTypeMethod(
+String rowDataVar, String messageTypeStr, String messageTypeVar, 
String code) {
+return createSplitMethod(
+"GenericRowData", rowDataVar, messageTypeStr, messageTypeVar, 
code);
+}
+
+public String splitSerializerRowTypeMethod(
+String rowDataVar, String messageTypeStr, String messageTypeVar, 
String code) {
+return createSplitMethod("RowDat

(flink) branch master updated: [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface

2023-11-08 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 55162dcc5cc [FLINK-33262][table-api] Extend source provider interfaces 
with the new parallelism provider interface
55162dcc5cc is described below

commit 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202
Author: Zhanghao Chen 
AuthorDate: Sat Nov 4 21:00:25 2023 +0800

[FLINK-33262][table-api] Extend source provider interfaces with the new 
parallelism provider interface

Close apache/flink#23663
---
 .../connector/source/DataStreamScanProvider.java|  4 +++-
 .../connector/source/SourceFunctionProvider.java| 21 -
 .../flink/table/connector/ParallelismProvider.java  | 12 ++--
 .../table/connector/source/InputFormatProvider.java | 21 -
 .../table/connector/source/SourceProvider.java  | 18 +-
 .../apache/flink/table/factories/FactoryUtil.java   |  9 +
 .../connectors/TransformationScanProvider.java  |  4 +++-
 7 files changed, 78 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
index 7fc4687363f..213e3806327 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.data.RowData;
 
@@ -35,7 +36,8 @@ import org.apache.flink.table.data.RowData;
  * or {@link InputFormatProvider}.
  */
 @PublicEvolving
-public interface DataStreamScanProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface DataStreamScanProvider
+extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
 /**
  * Creates a scan Java {@link DataStream} from a {@link 
StreamExecutionEnvironment}.
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
index ff7238d8a2f..e5c35525e16 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /**
  * Provider of a {@link SourceFunction} instance as a runtime implementation 
for {@link
  * ScanTableSource}.
@@ -32,10 +37,19 @@ import org.apache.flink.table.data.RowData;
  */
 @Deprecated
 @PublicEvolving
-public interface SourceFunctionProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface SourceFunctionProvider
+extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
 /** Helper method for creating a static provider. */
 static SourceFunctionProvider of(SourceFunction sourceFunction, 
boolean isBounded) {
+return of(sourceFunction, isBounded, null);
+}
+
+/** Helper method for creating a Source provider with a provided source 
parallelism. */
+static SourceFunctionProvider of(
+SourceFunction sourceFunction,
+boolean isBounded,
+@Nullable Integer sourceParallelism) {
 return new SourceFunctionProvider() {
 @Override
 public SourceFunction createSourceFunction() {
@@ -46,6 +60,11 @@ public interface SourceFunctionProvider extends 
ScanTableSource.ScanRuntimeProvi
 public boolean isBounded() {
 return isBounded;
 }
+
+@Override
+public Optional getParallelism() {
+return Optional.ofNullable(sourceParallelism);
+}
 };
 }
 
diff --git 
a/flink-table

[flink] branch master updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal

2023-10-20 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4d37b8c34ff [FLINK-33313][table] Fix 
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle 
binary literal
4d37b8c34ff is described below

commit 4d37b8c34ff062b7505ab8c0ca8f2181768aab60
Author: zoudan 
AuthorDate: Thu Oct 19 17:48:05 2023 +0800

[FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions 
throws an Exception when handle binary literal

Close apache/flink#23551
---
 .../table/planner/plan/utils/RexNodeExtractor.scala |  4 
 .../plan/utils/NestedProjectionUtilTest.scala   | 10 ++
 .../planner/plan/utils/RexNodeExtractorTest.scala   | 21 +
 .../planner/plan/utils/RexNodeRewriterTest.scala|  7 ---
 .../table/planner/plan/utils/RexNodeTestBase.scala  |  6 --
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index 482ce56dc63..481cbda8b82 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.types.logical.YearMonthIntervalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.util.Preconditions
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
@@ -502,6 +503,9 @@ class RexNodeToExpressionConverter(
 // convert to BigDecimal
 literal.getValueAs(classOf[java.math.BigDecimal])
 
+  case BINARY | VARBINARY =>
+literal.getValueAs(classOf[Array[Byte]])
+
   case _ =>
 literal.getValue
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   "$2",
   "$3",
   "$4",
+  "$5",
   "*($t2, $t3)",
   "100",
-  "<($t5, $t6)",
+  "<($t6, $t7)",
   "6",
-  ">($t1, $t8)",
-  "AND($t7, $t9)")))
+  ">($t1, $t9)",
+  "AND($t8, $t10)")))
 
 val nestedField = NestedProjectionUtil.build(exprs, 
rexProgram.getInputRowType)
 val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   Array(1),
   Array(2),
   Array(3),
-  Array(4)
+  Array(4),
+  Array(5)
 )
 assertArray(paths, orderedPaths)
 val builder = new FlinkRexBuilder(typeFactory)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
 import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 assertEquals(0, unconvertedRexNodes.length)
   }
 
+  @Test
+  def testExtractConditionWithBinaryLiteral(): Unit = {
+// blob
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5)
+
+

[flink] branch release-1.17 updated: [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format

2023-07-07 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new c66bcb66fd7 [FLINK-32547][docs] Add missing doc for Timestamp support 
in ProtoBuf format
c66bcb66fd7 is described below

commit c66bcb66fd7c201ced53e86a685bc3c6208a5c64
Author: Benchao Li 
AuthorDate: Fri Jul 7 22:10:45 2023 +0800

[FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format
---
 docs/content/docs/connectors/table/formats/protobuf.md | 5 +
 1 file changed, 5 insertions(+)

diff --git a/docs/content/docs/connectors/table/formats/protobuf.md 
b/docs/content/docs/connectors/table/formats/protobuf.md
index 5cbafc8d911..b28cf49f9d3 100644
--- a/docs/content/docs/connectors/table/formats/protobuf.md
+++ b/docs/content/docs/connectors/table/formats/protobuf.md
@@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type 
to Protobuf type.
   enum
   The enum value of protobuf can be mapped to string or number of 
flink row accordingly.
 
+
+  ROW<seconds BIGINT, nanos INT>
+  google.protobuf.timestamp
+  The google.protobuf.timestamp type can be mapped to seconds and 
fractions of seconds at nanosecond resolution in UTC epoch time using the row 
type as well as the protobuf definition.
+
 
 
 



[flink] branch master updated: [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format

2023-07-07 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1aef1f13b99 [FLINK-32547][docs] Add missing doc for Timestamp support 
in ProtoBuf format
1aef1f13b99 is described below

commit 1aef1f13b9975f3603952d763d756e4f831a1e75
Author: Benchao Li 
AuthorDate: Fri Jul 7 22:10:45 2023 +0800

[FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format
---
 docs/content/docs/connectors/table/formats/protobuf.md | 5 +
 1 file changed, 5 insertions(+)

diff --git a/docs/content/docs/connectors/table/formats/protobuf.md 
b/docs/content/docs/connectors/table/formats/protobuf.md
index 5cbafc8d911..b28cf49f9d3 100644
--- a/docs/content/docs/connectors/table/formats/protobuf.md
+++ b/docs/content/docs/connectors/table/formats/protobuf.md
@@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type 
to Protobuf type.
   enum
   The enum value of protobuf can be mapped to string or number of 
flink row accordingly.
 
+
+  ROW<seconds BIGINT, nanos INT>
+  google.protobuf.timestamp
+  The google.protobuf.timestamp type can be mapped to seconds and 
fractions of seconds at nanosecond resolution in UTC epoch time using the row 
type as well as the protobuf definition.
+
 
 
 



[flink] branch master updated: [FLINK-32370][runtime] Change log level to debug for AbstractHandler when job has not been initialized or is finished

2023-06-25 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4248d05da09 [FLINK-32370][runtime] Change log level to debug for 
AbstractHandler when job has not been initialized or is finished
4248d05da09 is described below

commit 4248d05da092e3e580ac3238ea9af51151609e4f
Author: Shammon FY 
AuthorDate: Mon Jun 26 08:35:05 2023 +0800

[FLINK-32370][runtime] Change log level to debug for AbstractHandler when 
job has not been initialized or is finished

This mostly affects the e2e tests which will check the corresponding log 
file content

Close apache/flink#22838
---
 .../org/apache/flink/runtime/rest/handler/AbstractHandler.java   | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index b7d5c5bdca9..75bbdf495dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
 import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
 import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
@@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
@@ -266,7 +268,12 @@ public abstract class AbstractHandler<
 HttpResponseStatus.SERVICE_UNAVAILABLE,
 responseHeaders);
 } else {
-log.error("Unhandled exception.", throwable);
+if (throwable instanceof UnavailableDispatcherOperationException
+|| throwable instanceof FileNotFoundException) {
+log.debug("Job is not initialized or is finished: {}", 
throwable.getMessage());
+} else {
+log.error("Unhandled exception.", throwable);
+}
 String stackTrace =
 String.format(
 "",



[flink] branch master updated (c4b8cafe912 -> 4006de97352)

2023-06-20 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from c4b8cafe912 [FLINK-24592][Table SQL/Client] FlinkSQL Client multiline 
parser improvements
 add 4006de97352 [FLINK-31549][jdbc-driver] Add jdbc driver docs

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/table/jdbcDriver.md | 235 +++
 docs/content.zh/docs/dev/table/overview.md   |   1 +
 docs/content/docs/dev/table/jdbcDriver.md| 235 +++
 docs/content/docs/dev/table/overview.md  |   1 +
 4 files changed, 472 insertions(+)
 create mode 100644 docs/content.zh/docs/dev/table/jdbcDriver.md
 create mode 100644 docs/content/docs/dev/table/jdbcDriver.md



[flink] branch master updated: [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished

2023-06-19 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher 
when job is finished
8119411addd is described below

commit 8119411addd9c82c15bab8480e7b35b8e6394d43
Author: Shammon FY 
AuthorDate: Mon Jun 19 10:17:19 2023 +0800

[FLINK-32370][streaming] Fix warn log in result fetcher when job is finished

Close apache/flink#22819
---
 .../client/program/rest/RestClusterClientTest.java | 37 ++
 .../operators/collect/CollectResultFetcher.java|  7 ++--
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..740eb06a57b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -1136,6 +1138,32 @@ class RestClusterClientTest {
 }
 }
 
+@Test
+void testSendCoordinationRequestException() throws Exception {
+final TestClientCoordinationHandler handler =
+new TestClientCoordinationHandler(new 
FlinkJobNotFoundException(jobId));
+try (TestRestServerEndpoint restServerEndpoint = 
createRestServerEndpoint(handler)) {
+try (RestClusterClient restClusterClient =
+
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
+String payload = "testing payload";
+TestCoordinationRequest request = new 
TestCoordinationRequest<>(payload);
+
+assertThatThrownBy(
+() ->
+restClusterClient
+.sendCoordinationRequest(
+jobId, new 
OperatorID(), request)
+.get())
+.matches(
+e ->
+
ExceptionUtils.findThrowableWithMessage(
+e,
+
FlinkJobNotFoundException.class.getName())
+.isPresent());
+}
+}
+}
+
 /**
  * The SUSPENDED job status should never be returned by the client thus 
client retries until it
  * either receives a different job status or the cluster is not reachable.
@@ -1166,9 +1194,15 @@ class RestClusterClientTest {
 ClientCoordinationRequestBody,
 ClientCoordinationResponseBody,
 ClientCoordinationMessageParameters> {
+@Nullable private final FlinkJobNotFoundException exception;
 
 private TestClientCoordinationHandler() {
+this(null);
+}
+
+private TestClientCoordinationHandler(@Nullable 
FlinkJobNotFoundException exception) {
 super(ClientCoordinationHeaders.getInstance());
+this.exception = exception;
 }
 
 @Override
@@ -1178,6 +1212,9 @@ class RestClusterClientTest {
 @Nonnull DispatcherGateway gateway)
 throws RestHandlerException {
 try {
+if (exception != null) {
+throw exception;
+}
 TestCoordinationRequest req =
 (TestCoordinationRequest)
 request.getRequestBody()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java
index 519b7d603f1..a7916502214 100644
--- 
a/flink-streami

[flink] branch master updated: [FLINK-32008][protobuf] Ensure bulk persistence is not supported

2023-06-17 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7d4ee28e85a [FLINK-32008][protobuf] Ensure bulk persistence is not 
supported
7d4ee28e85a is described below

commit 7d4ee28e85aad4abc8ad126c4d953d0e921ea07e
Author: Ryan Skraba 
AuthorDate: Fri Jun 16 19:32:09 2023 +0200

[FLINK-32008][protobuf] Ensure bulk persistence is not supported
---
 flink-formats/flink-protobuf/pom.xml   | 10 +++
 .../formats/protobuf/PbFileFormatFactory.java  | 83 ++
 .../org.apache.flink.table.factories.Factory   |  1 +
 .../formats/protobuf/ProtobufSQLITCaseTest.java| 32 +
 4 files changed, 126 insertions(+)

diff --git a/flink-formats/flink-protobuf/pom.xml 
b/flink-formats/flink-protobuf/pom.xml
index 00e04bc385e..760d32f230a 100644
--- a/flink-formats/flink-protobuf/pom.xml
+++ b/flink-formats/flink-protobuf/pom.xml
@@ -56,6 +56,16 @@ under the License.
provided

 
+   
+
+   
+   org.apache.flink
+   flink-connector-files
+   ${project.version}
+   provided
+   true
+   
+

com.google.protobuf
protobuf-java
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java
new file mode 100644
index 000..fc1cecf3c48
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter.Factory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Throw a {@link ValidationException} when using Protobuf format factory for 
file system.
+ *
+ * In practice, there is https://protobuf.dev/programming-guides/techniques/#streaming";>
+ * no standard for storing bulk protobuf messages. This factory is present 
to prevent falling
+ * back to the {@link 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter}, a
+ * line-based format which could silently succeed but write unrecoverable data 
to disk.
+ *
+ * If your use case requires storing bulk protobuf messages on disk, the 
parquet file format
+ * might be the appropriate container and has an API for mapping records to 
protobuf messages.
+ */
+@Internal
+public class PbFileFormatFactory implements BulkReaderFormatFactory, 
BulkWriterFormatFactory {
+
+@Override
+public String factoryIdentifier() {
+return PbFormatFactory.IDENTIFIER;
+}
+
+@Override
+public Set> requiredOptions() {
+return Collections.emptySet();
+}
+
+@Override
+public Set> optionalOptions() {
+return Collections.emptySet();
+}
+
+@Override
+public Set> forwardOptions() {
+return Collections.emptySet();
+}
+
+@Override
+public BulkDecodingFormat createDecodingFormat(
+DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+throw new ValidationException(
+"The 'protob

[flink] branch master updated (c0dab74002b -> a521f5a98a7)

2023-06-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from c0dab74002b [FLINK-32290][dist] Enable -XX:+IgnoreUnrecognizedVMOptions
 add a521f5a98a7 [FLINK-32343][jdbc-driver] Fix exception in jdbc driver 
for jdbc tools

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/jdbc/BaseConnection.java| 31 
 .../org/apache/flink/table/jdbc/BaseStatement.java | 13 -
 .../apache/flink/table/jdbc/FlinkConnection.java   | 33 ++
 .../apache/flink/table/jdbc/FlinkStatement.java| 13 +
 4 files changed, 46 insertions(+), 44 deletions(-)



[flink] branch master updated: [FLINK-31673][jdbc-driver] Add e2e test for jdbc driver

2023-06-15 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 089e9edcb6f [FLINK-31673][jdbc-driver] Add e2e test for jdbc driver
089e9edcb6f is described below

commit 089e9edcb6f250fbd18ae4cb0f285d47c629deb4
Author: Shammon FY 
AuthorDate: Wed Jun 14 18:19:55 2023 +0800

[FLINK-31673][jdbc-driver] Add e2e test for jdbc driver
---
 .../flink-end-to-end-tests-jdbc-driver/pom.xml |  74 ++
 .../jdbc/driver/tests/FlinkDriverExample.java  | 111 +
 flink-end-to-end-tests/pom.xml |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh|   1 +
 flink-end-to-end-tests/test-scripts/common.sh  |  14 +++
 .../test-scripts/test_sql_jdbc_driver.sh   |  44 
 .../org/apache/flink/table/gateway/SqlGateway.java |   2 -
 7 files changed, 245 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml
new file mode 100644
index 000..47b4eb3f43b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.18-SNAPSHOT
+   
+
+   flink-end-to-end-tests-jdbc-driver
+   Flink : E2E Tests : JDBC Driver
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-jdbc-driver-bundle
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-jdbc-driver
+   package
+   
+   shade
+   
+   
+   
JdbcDriverExample
+   
+   
+   
com.google.code.findbugs:jsr305
+   
+   
+   
org.apache.flink:flink-sql-jdbc-driver-bundle
+   
org.slf4j:slf4j-api
+   
+   
+   
+   
+   
+   
+   
+   
+
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java
new file mode 100644
index 000..4feb88a1e74
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.driver.tests;
+
+import java.io.Buf

[flink] branch master updated (a6adbdda0cd -> c7afa323582)

2023-06-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from a6adbdda0cd [FLINK-31835][table-planner] Fix the array type that can't 
be converted from the external primitive array
 add c7afa323582 [FLINK-32300][jdbc-driver] Support getObject for 
FlinkResultSet

No new revisions were added by this update.

Summary of changes:
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml   |  18 ++-
 .../org/apache/flink/table/jdbc/BaseStatement.java |   5 -
 .../apache/flink/table/jdbc/FlinkResultSet.java|  87 +-
 .../apache/flink/table/jdbc/FlinkStatement.java|  14 +++
 .../flink/table/jdbc/utils/ArrayFieldGetter.java   | 121 +++
 .../flink/table/jdbc/FlinkResultSetTest.java   | 129 -
 .../flink/table/jdbc/FlinkStatementTest.java   |   7 ++
 7 files changed, 318 insertions(+), 63 deletions(-)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java



[flink-connector-jdbc] branch main updated: [FLINK-32235][docs] Translate CrateDB Docs to chinese

2023-06-06 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
 new 1a2186c  [FLINK-32235][docs] Translate CrateDB Docs to chinese
1a2186c is described below

commit 1a2186c71fb765b85cc8b9371e6b5158b9ef10b4
Author: codenohup 
AuthorDate: Tue Jun 6 17:33:55 2023 +0800

[FLINK-32235][docs] Translate CrateDB Docs to chinese

Signed-off-by: codenohup 
---
 docs/content.zh/docs/connectors/table/jdbc.md | 36 ++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/connectors/table/jdbc.md 
b/docs/content.zh/docs/connectors/table/jdbc.md
index 063e0a3..ad2f1da 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -55,7 +55,7 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "
 | PostgreSQL | `org.postgresql`   | `postgresql`   | 
[下载](https://jdbc.postgresql.org/download/) |
 | Derby  | `org.apache.derby` | `derby`| 
[下载](http://db.apache.org/derby/derby_downloads.html) | |
 | SQL Server | `com.microsoft.sqlserver`  | `mssql-jdbc`   | 
[下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16)
 |
-| CrateDB| `io.crate` | `crate-jdbc`   | 
[Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
+| CrateDB| `io.crate` | `crate-jdbc`   | 
[下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
 
 当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref 
"docs/dev/configuration/overview" >}})了解在集群上执行时何连接它们。
 
@@ -605,7 +605,41 @@ SELECT * FROM test_table;
 SELECT * FROM mysql_catalog.given_database.test_table2;
 SELECT * FROM given_database.test_table2;
 ```
+
 
+### JDBC Catalog for CrateDB
+
+
+
+ CrateDB 元空间映射
+
+CrateDB 和 PostgreSQL 类似,但它只有一个默认名为 `crate` 的数据库。 此外它有一个额外的命名空间 `schema`,一个 
CrateDB 实例可以有多个 schema,其中一个 schema 默认名为"doc",每个 schema 可以包含多张表。 在 Flink 中,当查询由 
CrateDB catalog 注册的表时,用户可以使用 `schema_name.table_name` 或者只有 `table_name`。其中 
`schema_name` 是可选的,默认值为 "doc"。
+
+因此,Flink Catalog 和 CrateDB catalog 之间的元空间映射如下:
+  
+| Flink Catalog Metaspace Structure| CrateDB Metaspace Structure|
+| :|:---|
+| catalog name (defined in Flink only) | N/A|
+| database name| database name (一直是 `crate`)  |
+| table name   | [schema_name.]table_name   |
+
+Flink 中的 CrateDB 表的完整路径应该是 ``"..``"``。如果指定了 
schema,请注意转义 ``。
+
+这里提供了一些访问 CrateDB 表的例子:
+
+```sql
+-- 扫描 'doc' schema (即默认 schema)中的 'test_table' 表,schema 名称可以省略
+SELECT * FROM mycatalog.crate.doc.test_table;
+SELECT * FROM crate.doc.test_table;
+SELECT * FROM doc.test_table;
+SELECT * FROM test_table;
+  
+-- 扫描 'custom_schema' schema 中的 'test_table2' 表
+-- 自定义 schema 不能省略,并且必须与表一起转义
+SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
+SELECT * FROM crate.`custom_schema.test_table2`;
+SELECT * FROM `custom_schema.test_table2`;
+```
 
 
 数据类型映射



[flink] branch master updated: [FLINK-32211][sql-client] Supports row format in executor

2023-05-30 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 81902e81c43 [FLINK-32211][sql-client] Supports row format in executor
81902e81c43 is described below

commit 81902e81c43990c48d05ec81469b7d99c7922698
Author: Shammon FY 
AuthorDate: Mon May 29 15:01:29 2023 +0800

[FLINK-32211][sql-client] Supports row format in executor

In this commit, we also removed DataConverter in flink-sql-jdbc-driver 
module.

Close apache/flink#22671
---
 .../flink/table/client/gateway/Executor.java   |   9 ++
 .../flink/table/client/gateway/ExecutorImpl.java   |  37 +++-
 .../apache/flink/table/jdbc/FlinkConnection.java   |   4 +-
 .../apache/flink/table/jdbc/FlinkResultSet.java|  50 +-
 .../apache/flink/table/jdbc/FlinkStatement.java|   5 +-
 .../flink/table/jdbc/utils/DataConverter.java  |  88 -
 .../table/jdbc/utils/DatabaseMetaDataUtils.java|   6 +-
 .../table/jdbc/utils/DefaultDataConverter.java | 105 -
 .../table/jdbc/utils/StringDataConverter.java  | 105 -
 .../flink/table/jdbc/FlinkResultSetTest.java   |  47 -
 10 files changed, 89 insertions(+), 367 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index b636d326560..3be128e0708 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway;
 
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 
 import java.io.Closeable;
@@ -36,6 +37,14 @@ public interface Executor extends Closeable {
 return new ExecutorImpl(defaultContext, address, sessionId);
 }
 
+static Executor create(
+DefaultContext defaultContext,
+InetSocketAddress address,
+String sessionId,
+RowFormat rowFormat) {
+return new ExecutorImpl(defaultContext, address, sessionId, rowFormat);
+}
+
 static Executor create(DefaultContext defaultContext, URL address, String 
sessionId) {
 return new ExecutorImpl(defaultContext, address, sessionId);
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 4ec7eb3e8eb..f4f6b40ed56 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -112,6 +112,7 @@ public class ExecutorImpl implements Executor {
 
 private final SqlGatewayRestAPIVersion connectionVersion;
 private final SessionHandle sessionHandle;
+private final RowFormat rowFormat;
 
 public ExecutorImpl(
 DefaultContext defaultContext, InetSocketAddress gatewayAddress, 
String sessionId) {
@@ -119,11 +120,30 @@ public class ExecutorImpl implements Executor {
 defaultContext,
 NetUtils.socketToUrl(gatewayAddress),
 sessionId,
-HEARTBEAT_INTERVAL_MILLISECONDS);
+HEARTBEAT_INTERVAL_MILLISECONDS,
+RowFormat.PLAIN_TEXT);
+}
+
+public ExecutorImpl(
+DefaultContext defaultContext,
+InetSocketAddress gatewayAddress,
+String sessionId,
+RowFormat rowFormat) {
+this(
+defaultContext,
+NetUtils.socketToUrl(gatewayAddress),
+sessionId,
+HEARTBEAT_INTERVAL_MILLISECONDS,
+rowFormat);
 }
 
 public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String 
sessionId) {
-this(defaultContext, gatewayUrl, sessionId, 
HEARTBEAT_INTERVAL_MILLISECONDS);
+this(
+defaultContext,
+gatewayUrl,
+sessionId,
+HEARTBEAT_INTERVAL_MILLISECONDS,
+RowFormat.PLAIN_TEXT);
 }
 
 @VisibleForTesting
@@ -132,7 +152,12 @@ public class ExecutorImpl implements Executor {
 InetSocketAddress gatewayAddress,
 String sessionId,
 long heartbeatInterval) {
-this(defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, 
heartbeatInterval);
+this

[flink] branch master updated: [FLINK-32129][fs-connector] Use string array in PartitionCommitInfo

2023-05-22 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3cbacbf26f0 [FLINK-32129][fs-connector] Use string array in 
PartitionCommitInfo
3cbacbf26f0 is described below

commit 3cbacbf26f09b5301b280beed4f78fc03d573d76
Author: Shammon FY 
AuthorDate: Fri May 19 15:31:35 2023 +0800

[FLINK-32129][fs-connector] Use string array in PartitionCommitInfo

Using generics will throw exception when 'pipeline.generic-types' is 
disabled

Close apache/flink#22609
---
 .../file/table/stream/PartitionCommitInfo.java |  9 +++---
 .../file/table/stream/StreamingFileWriter.java |  3 +-
 .../file/table/stream/compact/CompactOperator.java |  2 +-
 .../file/table/stream/PartitionCommitInfoTest.java | 37 ++
 .../file/table/stream/StreamingFileWriterTest.java |  3 +-
 .../table/stream/compact/CompactOperatorTest.java  |  2 +-
 6 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
index abc6ede1d51..5decc717c8e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.file.table.stream;
 import org.apache.flink.annotation.Internal;
 
 import java.io.Serializable;
-import java.util.List;
 
 /**
  * The message sent by upstream.
@@ -37,12 +36,12 @@ public class PartitionCommitInfo implements Serializable {
 private long checkpointId;
 private int taskId;
 private int numberOfTasks;
-private List partitions;
+private String[] partitions;
 
 public PartitionCommitInfo() {}
 
 public PartitionCommitInfo(
-long checkpointId, int taskId, int numberOfTasks, List 
partitions) {
+long checkpointId, int taskId, int numberOfTasks, String[] 
partitions) {
 this.checkpointId = checkpointId;
 this.taskId = taskId;
 this.numberOfTasks = numberOfTasks;
@@ -73,11 +72,11 @@ public class PartitionCommitInfo implements Serializable {
 this.numberOfTasks = numberOfTasks;
 }
 
-public List getPartitions() {
+public String[] getPartitions() {
 return partitions;
 }
 
-public void setPartitions(List partitions) {
+public void setPartitions(String[] partitions) {
 this.partitions = partitions;
 }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
index eec6907bce3..7eb9faad119 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -152,6 +151,6 @@ public class StreamingFileWriter extends 
AbstractStreamingWriter(partitions;
+partitions.toArray(new String[0];
 }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
index a99f39b57a5..fa51d4aa767 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
@@ -158,7 +158,7 @@ public class CompactOperator extends 
AbstractStreamOperator(this.partitions;
+this.partitions.toArray(new String[0];
 this.partitions.clear();
 }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink

[flink] branch master updated: [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver

2023-05-19 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 06688f345f6 [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc 
driver
06688f345f6 is described below

commit 06688f345f6793a8964ec2175f44cda13c33
Author: Shammon FY 
AuthorDate: Sat May 6 09:51:44 2023 +0800

[FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver

Close apache/flink#22533
---
 .../client/cli/parser/SqlClientSyntaxHighlighter.java |  8 
 .../apache/flink/table/client/gateway/Executor.java   |  8 
 .../flink/table/client/gateway/ExecutorImpl.java  | 11 ++-
 .../apache/flink/table/client/cli/CliClientTest.java  |  9 -
 .../table/gateway/service/context/DefaultContext.java |  5 +
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml  |  9 -
 .../org/apache/flink/table/jdbc/FlinkConnection.java  | 19 ++-
 7 files changed, 41 insertions(+), 28 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
index bdcfa5a707d..366cae3f57e 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.cli.parser;
 
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
@@ -57,16 +58,15 @@ public class SqlClientSyntaxHighlighter extends 
DefaultHighlighter {
 
 @Override
 public AttributedString highlight(LineReader reader, String buffer) {
+ReadableConfig configuration = executor.getSessionConfig();
 final SyntaxHighlightStyle.BuiltInStyle style =
 SyntaxHighlightStyle.BuiltInStyle.fromString(
-executor.getSessionConfig()
-
.get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA));
+
configuration.get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA));
 
 if (style == SyntaxHighlightStyle.BuiltInStyle.DEFAULT) {
 return super.highlight(reader, buffer);
 }
-final String dialectName =
-
executor.getSessionConfig().get(TableConfigOptions.TABLE_SQL_DIALECT);
+final String dialectName = 
configuration.get(TableConfigOptions.TABLE_SQL_DIALECT);
 final SqlDialect dialect =
 SqlDialect.HIVE.name().equalsIgnoreCase(dialectName)
 ? SqlDialect.HIVE
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 8414789c993..b636d326560 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
+import java.util.Map;
 
 /** A gateway for communicating with Flink and other external systems. */
 public interface Executor extends Closeable {
@@ -53,6 +54,13 @@ public interface Executor extends Closeable {
  */
 ReadableConfig getSessionConfig();
 
+/**
+ * Get the map configuration of the session.
+ *
+ * @return the map session configuration.
+ */
+Map getSessionConfigMap();
+
 /**
  * Execute statement.
  *
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index fcdee3c779d..4ec7eb3e8eb 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -83,6 +83,7 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -206,6 +207,14 @@ public class ExecutorImpl implements Executor {
 }
 
 public ReadableConfig getSessionConfig() {
+try

[flink] branch master updated: [FLINK-31548][jdbc-driver] Introduce FlinkDataSource for flink jdbc driver

2023-05-17 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 420b5e319e1 [FLINK-31548][jdbc-driver] Introduce FlinkDataSource for 
flink jdbc driver
420b5e319e1 is described below

commit 420b5e319e1f8e917c099530360f51f73a070575
Author: Shammon FY 
AuthorDate: Sat May 6 09:00:38 2023 +0800

[FLINK-31548][jdbc-driver] Introduce FlinkDataSource for flink jdbc driver

Close apache/flink#22532
---
 .../apache/flink/table/jdbc/FlinkDataSource.java   | 88 ++
 .../flink/table/jdbc/FlinkDataSourceTest.java  | 39 ++
 2 files changed, 127 insertions(+)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java
new file mode 100644
index 000..d8a84e19742
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import javax.sql.DataSource;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+/** Basic flink data source which create {@link FlinkConnection}. */
+public class FlinkDataSource implements DataSource {
+private final String url;
+private final Properties properties;
+
+public FlinkDataSource(String url, Properties properties) {
+this.url = url;
+this.properties = properties;
+}
+
+@Override
+public Connection getConnection() throws SQLException {
+return new FlinkConnection(DriverUri.create(url, properties));
+}
+
+@Override
+public Connection getConnection(String username, String password) throws 
SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDataSource#getConnection with username and password is 
not supported");
+}
+
+@Override
+public  T unwrap(Class iface) throws SQLException {
+throw new SQLFeatureNotSupportedException("FlinkDataSource#unwrap is 
not supported");
+}
+
+@Override
+public boolean isWrapperFor(Class iface) throws SQLException {
+throw new 
SQLFeatureNotSupportedException("FlinkDataSource#isWrapperFor is not 
supported");
+}
+
+@Override
+public PrintWriter getLogWriter() throws SQLException {
+throw new 
SQLFeatureNotSupportedException("FlinkDataSource#getLogWriter is not 
supported");
+}
+
+@Override
+public void setLogWriter(PrintWriter out) throws SQLException {
+throw new 
SQLFeatureNotSupportedException("FlinkDataSource#setLogWriter is not 
supported");
+}
+
+@Override
+public void setLoginTimeout(int seconds) throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDataSource#setLoginTimeout is not supported");
+}
+
+@Override
+public int getLoginTimeout() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDataSource#getLoginTimeout is not supported");
+}
+
+@Override
+public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDataSource#getParentLogger is not supported");
+}
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java
new file mode 100644
index 000..9d00c37d24d
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache 

[flink] branch master updated (ed9ee279e50 -> 400530d30af)

2023-05-09 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from ed9ee279e50 [FLINK-30815][tests] Migrate BatchTestBase to junit5
 add 400530d30af [FLINK-31546][jdbc-driver] Close all statements when 
connection is closed

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/jdbc/FlinkConnection.java   | 48 --
 .../apache/flink/table/jdbc/FlinkStatement.java|  5 ++-
 .../flink/table/jdbc/FlinkStatementTest.java   | 15 +++
 3 files changed, 63 insertions(+), 5 deletions(-)



[flink] branch master updated: [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver

2023-05-05 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for 
jdbc driver
2ff4e220945 is described below

commit 2ff4e220945680eba21065480385148ca8d034da
Author: Shammon FY 
AuthorDate: Thu Apr 6 13:09:19 2023 +0800

[FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver

Close apache/flink#22362
---
 .../flink/table/jdbc/BaseDatabaseMetaData.java | 792 +
 .../org/apache/flink/table/jdbc/DriverUri.java |   4 +
 .../apache/flink/table/jdbc/FlinkConnection.java   |  15 +-
 .../flink/table/jdbc/FlinkDatabaseMetaData.java| 384 ++
 .../apache/flink/table/jdbc/FlinkResultSet.java|  29 +-
 .../flink/table/jdbc/FlinkResultSetMetaData.java   |  10 +-
 .../table/jdbc/utils/CloseableResultIterator.java  |  24 +
 .../table/jdbc/utils/CollectionResultIterator.java |  45 ++
 .../table/jdbc/utils/DatabaseMetaDataUtils.java| 110 +++
 .../table/jdbc/utils/StatementResultIterator.java  |  46 ++
 .../flink/table/jdbc/FlinkConnectionTest.java  |   5 +-
 .../table/jdbc/FlinkDatabaseMetaDataTest.java  | 121 
 12 files changed, 1567 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
new file mode 100644
index 000..75ec33aab67
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+/** Base {@link DatabaseMetaData} for flink driver with not supported 
features. */
+public abstract class BaseDatabaseMetaData implements DatabaseMetaData {
+@Override
+public String getUserName() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#getUserName is not supported");
+}
+
+@Override
+public boolean nullsAreSortedHigh() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedHigh is not supported");
+}
+
+@Override
+public boolean nullsAreSortedLow() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedLow is not supported");
+}
+
+@Override
+public boolean nullsAreSortedAtStart() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedAtStart is not 
supported");
+}
+
+@Override
+public boolean nullsAreSortedAtEnd() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported");
+}
+
+@Override
+public boolean supportsMixedCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesUpperCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesLowerCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesMixedCaseIdentifiers() throws SQLExc

[flink] branch master updated: [FLINK-31543][jdbc-driver] Introduce statement for flink jdbc driver

2023-04-26 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 63166226f65 [FLINK-31543][jdbc-driver] Introduce statement for flink 
jdbc driver
63166226f65 is described below

commit 63166226f65cc4da80cc7661d76eb90032ec
Author: Shammon FY 
AuthorDate: Tue Apr 18 19:13:24 2023 +0800

[FLINK-31543][jdbc-driver] Introduce statement for flink jdbc driver

Close apache/flink#22417
---
 flink-table/flink-sql-jdbc-driver/pom.xml  |  12 ++
 .../org/apache/flink/table/jdbc/BaseStatement.java | 238 +
 .../apache/flink/table/jdbc/FlinkConnection.java   |   2 +-
 .../apache/flink/table/jdbc/FlinkResultSet.java|   5 +-
 .../flink/table/jdbc/FlinkResultSetMetaData.java   |  19 +-
 .../apache/flink/table/jdbc/FlinkStatement.java| 157 ++
 .../flink/table/jdbc/FlinkConnectionTest.java  |  48 +
 .../flink/table/jdbc/FlinkJdbcDriverTestBase.java  |  66 ++
 .../table/jdbc/FlinkResultSetMetaDataTest.java |   7 +-
 .../flink/table/jdbc/FlinkStatementTest.java   | 123 +++
 10 files changed, 619 insertions(+), 58 deletions(-)

diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml 
b/flink-table/flink-sql-jdbc-driver/pom.xml
index 3d6968fa8e4..fc6b7b285c8 100644
--- a/flink-table/flink-sql-jdbc-driver/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver/pom.xml
@@ -72,6 +72,18 @@
${project.version}
test

+   
+   org.apache.flink
+   flink-connector-files
+   ${project.version}
+   test
+   
+   
+   org.apache.flink
+   flink-csv
+   ${project.version}
+   test
+   

 

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java
new file mode 100644
index 000..08bf3107f28
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+
+/** Base statement in flink driver. */
+public abstract class BaseStatement implements Statement {
+@Override
+public int executeUpdate(String sql) throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#executeUpdate is not supported yet.");
+}
+
+@Override
+public int getMaxFieldSize() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#getMaxFieldSize is not supported yet.");
+}
+
+@Override
+public void setMaxFieldSize(int max) throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#setMaxFieldSize is not supported yet.");
+}
+
+@Override
+public int getMaxRows() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#getMaxRows is not supported yet.");
+}
+
+@Override
+public void setMaxRows(int max) throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#setMaxRows is not supported yet.");
+}
+
+@Override
+public void setEscapeProcessing(boolean enable) throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkStatement#setEscapeProcessing is not supported yet.");
+}
+
+@Override
+public int getQueryTimeout() throws SQLException {
+throw new S

[flink] branch master updated: [FLINK-31787][docs] Add explicit ROW constructor to sql functions doc

2023-04-18 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new fc7d821dfa1 [FLINK-31787][docs] Add explicit ROW constructor to sql 
functions doc
fc7d821dfa1 is described below

commit fc7d821dfa16bad8ca58cb716b1b429d55b8317a
Author: Aitozi 
AuthorDate: Thu Apr 13 10:06:24 2023 +0800

[FLINK-31787][docs] Add explicit ROW constructor to sql functions doc

Close apache/flink#22386
---
 docs/data/sql_functions.yml   |  9 +++--
 docs/data/sql_functions_zh.yml| 11 ---
 docs/layouts/shortcodes/sql_functions.html|  2 +-
 docs/layouts/shortcodes/sql_functions_zh.html |  2 +-
 4 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 471b3bcfea1..1aa2ca0872e 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -931,13 +931,18 @@ json:
 
 valueconstruction:
   - sql: |
-  -- implicit constructor with parenthesis
+  **implicit** constructor with parenthesis 
+  
   (value1 [, value2]*)
+
+  **explicit** ROW constructor with 
+  
+  ROW(value1 [, value2]*)
 table: row(ANY1, ANY2, ...)
 description: |
   Returns a row created from a list of values (value1, value2,...).
 
-  The implicit row constructor supports arbitrary expressions as fields 
but requires at least two fields. The explicit row constructor can deal with an 
arbitrary number of fields but does not support all kinds of field expressions 
well currently. 
+  The implicit row constructor requires at least two fields. The explicit 
row constructor can deal with an arbitrary number of fields. Both of them 
support arbitrary expressions as fields.
   - sql: ARRAY ‘[’ value1 [, value2 ]* ‘]’
 table: array(ANY1, ANY2, ...)
 description: Returns an array created from a list of values (value1, 
value2, ...).
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index c3c24d555b2..c0e7a1995ef 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -985,12 +985,17 @@ json:
 
 valueconstruction:
   - sql: |
-  -- implicit constructor with parenthesis
+  **implicit** constructor with parenthesis 
+  
   (value1 [, value2]*)
+
+  **explicit** ROW constructor with 
+  
+  ROW(value1 [, value2]*)
 table: row(ANY1, ANY2, ...)
 description: |
-  返回从值列表 (value1, value2, ...) 创建的行。隐式行构造函数支持任意表达式作为字段,但至少需要两个字段。
-  显式行构造函数可以处理任意数量的字段,但目前还不能很好地支持所有类型的字段表达式。
+  返回从值列表 (value1, value2, ...) 创建的行。隐式行构造函数至少需要两个字段。显式行构造函数可以处理任意数量的字段。
+  两者都支持任意表达式作为字段
   - sql: ARRAY ‘[’ value1 [, value2 ]* ‘]’
 table: array(ANY1, ANY2, ...)
 description: 返回从值列表 (value1, value2, ...) 创建的数组。
diff --git a/docs/layouts/shortcodes/sql_functions.html 
b/docs/layouts/shortcodes/sql_functions.html
index ec8667ee1c7..a8e3557778c 100644
--- a/docs/layouts/shortcodes/sql_functions.html
+++ b/docs/layouts/shortcodes/sql_functions.html
@@ -35,7 +35,7 @@ under the License.
 {{ range $category }}
 
 
-{{ .sql | default "N/A" }}
+{{ .sql | $.Page.RenderString | default "N/A" }}
 
 
 {{ .table | default "N/A" }}
diff --git a/docs/layouts/shortcodes/sql_functions_zh.html 
b/docs/layouts/shortcodes/sql_functions_zh.html
index 4047517c30c..949fa00d363 100644
--- a/docs/layouts/shortcodes/sql_functions_zh.html
+++ b/docs/layouts/shortcodes/sql_functions_zh.html
@@ -35,7 +35,7 @@ under the License.
 {{ range $category }}
 
 
-{{ .sql | default "不适用" }}
+{{ .sql | $.Page.RenderString | default "不适用" }}
 
 
 {{ .table | default "不适用" }}



[flink] branch master updated: [FLINK-31741][jdbc-driver] Support data converter for value in statement result

2023-04-18 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9538fdaab29 [FLINK-31741][jdbc-driver] Support data converter for 
value in statement result
9538fdaab29 is described below

commit 9538fdaab2948a2e3dd068925d936ac0777301de
Author: shammon FY 
AuthorDate: Thu Apr 6 11:34:12 2023 +0800

[FLINK-31741][jdbc-driver] Support data converter for value in statement 
result

Close apache/flink#22360
---
 .../apache/flink/table/jdbc/FlinkResultSet.java|  35 +--
 .../flink/table/jdbc/utils/DataConverter.java  |  88 
 .../table/jdbc/utils/DefaultDataConverter.java | 105 +
 .../table/jdbc/utils/StringDataConverter.java  | 105 +
 .../flink/table/jdbc/FlinkResultSetTest.java   | 238 +
 5 files changed, 463 insertions(+), 108 deletions(-)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index cfb7ebc3f09..6c8072b60ba 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.client.gateway.StatementResult;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.utils.DataConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
@@ -50,14 +50,17 @@ public class FlinkResultSet extends BaseResultSet {
 private final List columnNameList;
 private final Statement statement;
 private final StatementResult result;
+private final DataConverter dataConverter;
 private RowData currentRow;
 private boolean wasNull;
 
 private volatile boolean closed;
 
-public FlinkResultSet(Statement statement, StatementResult result) {
+public FlinkResultSet(
+Statement statement, StatementResult result, DataConverter 
dataConverter) {
 this.statement = checkNotNull(statement, "Statement cannot be null");
 this.result = checkNotNull(result, "Statement result cannot be null");
+this.dataConverter = checkNotNull(dataConverter, "Data converter 
cannot be null");
 this.currentRow = null;
 this.wasNull = false;
 
@@ -133,9 +136,8 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidRow();
 checkValidColumn(columnIndex);
 
-StringData stringData = currentRow.getString(columnIndex - 1);
 try {
-return stringData == null ? null : stringData.toString();
+return dataConverter.getString(currentRow, columnIndex - 1);
 } catch (Exception e) {
 throw new SQLDataException(e);
 }
@@ -147,8 +149,7 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidRow();
 checkValidColumn(columnIndex);
 try {
-return currentRow.getBoolean(columnIndex - 1);
-
+return dataConverter.getBoolean(currentRow, columnIndex - 1);
 } catch (Exception e) {
 throw new SQLDataException(e);
 }
@@ -160,7 +161,7 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidRow();
 checkValidColumn(columnIndex);
 try {
-return currentRow.getByte(columnIndex - 1);
+return dataConverter.getByte(currentRow, columnIndex - 1);
 } catch (Exception e) {
 throw new SQLDataException(e);
 }
@@ -172,7 +173,7 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidRow();
 checkValidColumn(columnIndex);
 try {
-return currentRow.getShort(columnIndex - 1);
+return dataConverter.getShort(currentRow, columnIndex - 1);
 } catch (Exception e) {
 throw new SQLDataException(e);
 }
@@ -184,7 +185,7 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidRow();
 checkValidColumn(columnIndex);
 try {
-return currentRow.getInt(columnIndex - 1);
+return dataConverter.getInt(currentRow, columnIndex - 1);
 } catch (Exception e) {
 throw new SQLDataException(e);
 }
@@ -197,7 +198,7 @@ public class FlinkResultSet extends BaseResultSet {
 checkValidColumn(columnIndex);
 
 try {
-return currentRow.getLong(columnIndex - 1);
+return dataConverter.getLong(currentRow, columnIndex - 1);

[flink-connector-jdbc] branch hotfix/update_copyright_year_to_2023 created (now 4ad108d)

2023-04-16 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch hotfix/update_copyright_year_to_2023
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


  at 4ad108d  [hotfix] Update the copyright year to 2012-2023 in NOTICE

This branch includes the following new commits:

 new 4ad108d  [hotfix] Update the copyright year to 2012-2023 in NOTICE

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink-connector-jdbc] 01/01: [hotfix] Update the copyright year to 2012-2023 in NOTICE

2023-04-16 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch hotfix/update_copyright_year_to_2023
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 4ad108db42527f646d86a442f1585db2727e5541
Author: Benchao Li 
AuthorDate: Sun Apr 16 17:53:28 2023 +0800

[hotfix] Update the copyright year to 2012-2023 in NOTICE
---
 NOTICE | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/NOTICE b/NOTICE
index 1553115..0161a58 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache Flink JDBC Connector
-Copyright 2014-2022 The Apache Software Foundation
+Copyright 2014-2023 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).



[flink] branch master updated: [FLINK-31808][docs] Fix wrong examples of how to set operator name in documents

2023-04-14 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 97dee4bd2ad [FLINK-31808][docs] Fix wrong examples of how to set 
operator name in documents
97dee4bd2ad is described below

commit 97dee4bd2ade278805241a245385df3ceeb90150
Author: Weihua Hu 
AuthorDate: Fri Apr 14 17:31:07 2023 +0800

[FLINK-31808][docs] Fix wrong examples of how to set operator name in 
documents

Close apache/flink#22401
---
 docs/content.zh/docs/dev/datastream/operators/overview.md | 4 ++--
 docs/content/docs/dev/datastream/operators/overview.md| 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md 
b/docs/content.zh/docs/dev/datastream/operators/overview.md
index fbccbbdbc10..a8c1b901d46 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -851,12 +851,12 @@ Flink里的算子和作业节点会有一个名字和一个描述。名字和描
 {{< tabs namedescription>}}
 {{< tab "Java" >}}
 ```java
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and 
y > 1");
+someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y 
> 1");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and 
y > 1")
+someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y 
> 1")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
diff --git a/docs/content/docs/dev/datastream/operators/overview.md 
b/docs/content/docs/dev/datastream/operators/overview.md
index d471f485fe5..5e04da86aea 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -855,12 +855,12 @@ The description can contain detail information about 
operators to facilitate deb
 {{< tabs namedescription >}}
 {{< tab "Java" >}}
 ```java
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and 
y > 1");
+someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y 
> 1");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and 
y > 1")
+someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y 
> 1")
 ```
 {{< /tab >}}
 {{< tab "Python" >}}



[flink] branch master updated: [FLINK-31545][jdbc-driver] Create executor in flink connection

2023-04-03 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8c44d58c4c4 [FLINK-31545][jdbc-driver] Create executor in flink 
connection
8c44d58c4c4 is described below

commit 8c44d58c4c4aeb36c91d8b27f4128891970dc47d
Author: shammon FY 
AuthorDate: Wed Mar 29 10:47:11 2023 +0800

[FLINK-31545][jdbc-driver] Create executor in flink connection

Close apache/flink#22289
---
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml   |  28 +
 flink-table/flink-sql-jdbc-driver/pom.xml  |  23 +++-
 .../org/apache/flink/table/jdbc/DriverUri.java |  15 +--
 .../apache/flink/table/jdbc/FlinkConnection.java   |  96 +--
 .../flink/table/jdbc/FlinkConnectionTest.java  | 136 +
 .../apache/flink/table/jdbc/FlinkDriverTest.java   |   8 +-
 6 files changed, 279 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml 
b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
index 9ad99ffee9f..9d43bdb9be4 100644
--- a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml
@@ -50,15 +50,40 @@
flink-sql-gateway-api
${project.version}

+   
+   org.apache.flink
+   flink-sql-gateway
+   ${project.version}
+   

org.apache.flink
flink-table-common
${project.version}

+   
+   org.apache.flink
+   flink-annotations
+   ${project.version}
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   

 


+   
+   io.github.zentol.japicmp
+   japicmp-maven-plugin
+   
+   
+   true
+   
+   


org.apache.maven.plugins
@@ -77,7 +102,10 @@

org.apache.flink:flink-sql-jdbc-driver

org.apache.flink:flink-sql-client

org.apache.flink:flink-sql-gateway-api
+   
org.apache.flink:flink-sql-gateway

org.apache.flink:flink-table-common
+   
org.apache.flink:flink-annotations
+   
org.apache.flink:flink-core



diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml 
b/flink-table/flink-sql-jdbc-driver/pom.xml
index 267bca001ac..3d6968fa8e4 100644
--- a/flink-table/flink-sql-jdbc-driver/pom.xml
+++ b/flink-table/flink-sql-jdbc-driver/pom.xml
@@ -48,9 +48,30 @@


org.apache.flink
-   flink-table-api-java-bridge
+   flink-sql-gateway
${project.version}

+
+   
+   
+   org.apache.flink
+   flink-sql-gateway
+   ${project.version}
+   test-jar
+   test
+   
+   
+   org.apache.flink
+   flink-test-utils
+   ${project.version}
+   test
+   
+   
+   org.apache.flink
+   
flink-table-planner_${scala.binary.version}
+   ${project.version}
+   test
+   

 

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
index bb7952a2417..a13650b45e7 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
+++ 
b/flink-table/flink-sql-jdbc

[flink] branch master updated (cfb213040a0 -> b81291864e6)

2023-03-30 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from cfb213040a0 [FLINK-31604][table] Reduce usage of CatalogTableImpl in 
table-planner (#22263)
 add b81291864e6 [FLINK-31547][jdbc-driver] Introduce 
FlinkResultSetMetaData for jdbc driver

No new revisions were added by this update.

Summary of changes:
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml   |   6 +
 .../org/apache/flink/table/jdbc/ColumnInfo.java| 287 +
 .../flink/table/jdbc/FlinkResultSetMetaData.java   | 257 ++
 .../table/jdbc/FlinkResultSetMetaDataTest.java | 124 +
 4 files changed, 674 insertions(+)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/ColumnInfo.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetMetaDataTest.java



[flink] branch master updated (0dd9bcb0a9c -> 3814fe8ddf5)

2023-03-29 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 0dd9bcb0a9c [FLINK-31399] AdaptiveScheduler should free excessive 
slots that are no longer needed after down scaling.
 add 3814fe8ddf5 [FLINK-31522][jdbc-driver] Introduce FlinkResultSet for 
jdbc driver

No new revisions were added by this update.

Summary of changes:
 flink-table/flink-sql-jdbc-driver-bundle/pom.xml   |   6 +
 flink-table/flink-sql-jdbc-driver/pom.xml  |   5 +
 .../org/apache/flink/table/jdbc/BaseResultSet.java | 821 +
 .../apache/flink/table/jdbc/FlinkResultSet.java| 445 +++
 .../flink/table/jdbc/FlinkResultSetTest.java   | 176 +
 .../apache/flink/table/jdbc/TestingStatement.java  | 220 ++
 6 files changed, 1673 insertions(+)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseResultSet.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/TestingStatement.java



[flink] branch master updated (de27786eba5 -> f14a02ecac3)

2023-03-28 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from de27786eba5 [FLINK-31541][ui] encode request params to support special 
characters in metrics name.
 add f14a02ecac3 [FLINK-31538][jdbc-driver] Support parsing 
catalog/database and properties for uri

No new revisions were added by this update.

Summary of changes:
 flink-table/flink-sql-jdbc-driver/pom.xml  |  19 ++
 .../apache/flink/table/jdbc/BaseConnection.java| 297 +
 .../org/apache/flink/table/jdbc/DriverInfo.java|  86 ++
 .../org/apache/flink/table/jdbc/DriverUri.java | 203 ++
 .../apache/flink/table/jdbc/FlinkConnection.java   |  95 +++
 .../org/apache/flink/table/jdbc/FlinkDriver.java   |  32 ++-
 .../apache/flink/table/jdbc/utils/DriverUtils.java | 102 +++
 .../org/apache/flink/table/jdbc/driver.properties  |   3 +-
 .../apache/flink/table/jdbc/FlinkDriverTest.java   | 103 +++
 9 files changed, 931 insertions(+), 9 deletions(-)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseConnection.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverInfo.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DriverUtils.java
 copy 
flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
 => 
flink-table/flink-sql-jdbc-driver/src/main/resources/org/apache/flink/table/jdbc/driver.properties
 (91%)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java



[flink] branch master updated (1d33773e6b7 -> 25798e8b2ed)

2023-03-21 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 1d33773e6b7 [FLINK-25509][connector-base] Add RecordEvaluator to 
dynamically stop source based on de-serialized records
 add 25798e8b2ed [FLINK-31521][jdbc-driver] Initialize jdbc driver modules 
in flink-table

No new revisions were added by this update.

Summary of changes:
 .../pom.xml| 54 ---
 .../flink-sql-jdbc-driver}/pom.xml | 46 +++-
 .../org/apache/flink/table/jdbc/FlinkDriver.java   | 81 ++
 .../resources/META-INF/services/java.sql.Driver|  2 +-
 flink-table/pom.xml|  2 +
 5 files changed, 108 insertions(+), 77 deletions(-)
 copy flink-table/{flink-table-planner-loader-bundle => 
flink-sql-jdbc-driver-bundle}/pom.xml (60%)
 copy {flink-connectors/flink-connector-datagen => 
flink-table/flink-sql-jdbc-driver}/pom.xml (59%)
 create mode 100644 
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDriver.java
 copy 
flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 => 
flink-table/flink-sql-jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver
 (95%)



[flink-connector-jdbc] branch main updated: [FLINK-31181] Support LIKE operator pushdown

2023-02-28 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
 new aa75d64  [FLINK-31181] Support LIKE operator pushdown
aa75d64 is described below

commit aa75d6476a381c4c80bb39364944891c514b8002
Author: Grzegorz Kołakowski 
AuthorDate: Wed Feb 22 09:44:11 2023 +0100

[FLINK-31181] Support LIKE operator pushdown
---
 .../jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java | 3 +++
 .../jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
index f9622a8..a4e31ee 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java
@@ -79,6 +79,9 @@ public class JdbcFilterPushdownPreparedStatementVisitor
 if 
(BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
 return renderBinaryOperator("AND", call.getResolvedChildren());
 }
+if 
(BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("LIKE", call.getResolvedChildren());
+}
 if 
(BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
 return renderUnaryOperator("IS NULL", 
call.getResolvedChildren().get(0), true);
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
index 8bdfc67..57a9e0d 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -144,7 +144,8 @@ class JdbcFilterPushdownPreparedStatementVisitorTest {
 new Object[] {"real_col > 0.5", "real_col > ?", new 
BigDecimal("0.5")},
 new Object[] {
 "double_col <= -0.3", "double_col <= ?", new 
BigDecimal("-0.3")
-})
+},
+new Object[] {"description LIKE '_bcd%'", "description 
LIKE ?", "_bcd%"})
 .forEach(
 inputs ->
 assertSimpleInputExprEqualsOutExpr(



[flink] branch master updated: [FLINK-31113][orc] Support AND filter push down in orc

2023-02-21 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 5cda70d873c [FLINK-31113][orc] Support AND filter push down in orc
5cda70d873c is described below

commit 5cda70d873c9630c898d765633ec7a6cfe53e3c6
Author: shammon 
AuthorDate: Tue Feb 21 17:18:59 2023 +0800

[FLINK-31113][orc] Support AND filter push down in orc

This close #21956
---
 .../main/java/org/apache/flink/orc/OrcFilters.java | 45 ++
 .../apache/flink/orc/OrcFileSystemFilterTest.java  | 11 ++
 2 files changed, 56 insertions(+)

diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
index a33b79d18f9..4393356fc30 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java
@@ -62,6 +62,7 @@ public class OrcFilters {
 OrcFilters::convertIsNotNull)
 .put(BuiltInFunctionDefinitions.NOT, 
OrcFilters::convertNot)
 .put(BuiltInFunctionDefinitions.OR, 
OrcFilters::convertOr)
+.put(BuiltInFunctionDefinitions.AND, 
OrcFilters::convertAnd)
 .put(
 BuiltInFunctionDefinitions.EQUALS,
 call ->
@@ -186,6 +187,22 @@ public class OrcFilters {
 }
 }
 
+private static Predicate convertAnd(CallExpression callExp) {
+if (callExp.getChildren().size() < 2) {
+return null;
+}
+Expression left = callExp.getChildren().get(0);
+Expression right = callExp.getChildren().get(1);
+
+Predicate c1 = toOrcPredicate(left);
+Predicate c2 = toOrcPredicate(right);
+if (c1 == null || c2 == null) {
+return null;
+} else {
+return new And(c1, c2);
+}
+}
+
 public static Predicate convertBinary(
 CallExpression callExp,
 TriFunction 
func,
@@ -719,4 +736,32 @@ public class OrcFilters {
 return "OR(" + Arrays.toString(preds) + ")";
 }
 }
+
+/** An AND predicate that can be evaluated by the OrcInputFormat. */
+public static class And extends Predicate {
+private final Predicate[] preds;
+
+/**
+ * Creates an AND predicate.
+ *
+ * @param predicates The conjunctive predicates.
+ */
+public And(Predicate... predicates) {
+this.preds = predicates;
+}
+
+@Override
+public SearchArgument.Builder add(SearchArgument.Builder builder) {
+SearchArgument.Builder withAnd = builder.startAnd();
+for (Predicate pred : preds) {
+withAnd = pred.add(withAnd);
+}
+return withAnd.end();
+}
+
+@Override
+public String toString() {
+return "AND(" + Arrays.toString(preds) + ")";
+}
+}
 }
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
index 8137d78ac38..cdee400723d 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -74,5 +75,15 @@ class OrcFileSystemFilterTest {
 OrcFilters.Predicate predicate6 =
 new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10);
 assertThat(predicate5).hasToString(predicate6.toString());
+
+// and
+CallExpression andExpression =
+CallExpression.permanent(
+BuiltInFunctionDefinitions.AND,
+Arrays.asList(greaterExpression, lessExpression),
+DataTypes.BOOLEAN());
+OrcFilters.Predicate predicate7 = 
OrcFilters.toOrcPredicate(andExpression);
+OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, 
predicate6);
+assertThat(predicate7).hasToString(predicate8.toString());
 }
 }



[flink] branch master updated: [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type

2023-01-19 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ea4476c054 [FLINK-30093][protobuf] Fix compile errors for 
google.protobuf.Timestamp type
7ea4476c054 is described below

commit 7ea4476c0544e17798cbb1e39609827954f6c266
Author: laughingman7743 
AuthorDate: Wed Jan 4 23:12:19 2023 +0900

[FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp 
type

Close #21613
---
 .../docs/connectors/table/formats/protobuf.md  |  5 ++
 flink-formats/flink-protobuf/pom.xml   |  1 +
 .../apache/flink/formats/protobuf/PbConstant.java  |  1 +
 .../flink/formats/protobuf/PbFormatContext.java|  8 +--
 .../deserialize/PbCodegenArrayDeserializer.java|  3 +-
 .../deserialize/PbCodegenMapDeserializer.java  |  6 +-
 .../deserialize/PbCodegenRowDeserializer.java  |  3 +-
 .../protobuf/deserialize/ProtoToRowConverter.java  |  6 +-
 .../serialize/PbCodegenArraySerializer.java|  3 +-
 .../protobuf/serialize/PbCodegenMapSerializer.java |  6 +-
 .../protobuf/serialize/PbCodegenRowSerializer.java | 11 +---
 .../serialize/PbCodegenSimpleSerializer.java   |  8 +--
 .../protobuf/serialize/RowToProtoConverter.java|  4 +-
 .../formats/protobuf/util/PbCodegenUtils.java  | 19 +++
 .../flink/formats/protobuf/util/PbFormatUtils.java | 58 +++-
 ...RowTest.java => MetaNoMultiProtoToRowTest.java} | 18 ++
 .../formats/protobuf/MetaOuterNoMultiTest.java |  4 +-
 .../protobuf/SameOuterClassNameProtoToRowTest.java | 61 +
 .../protobuf/SameOuterClassNameRowToProtoTest.java | 56 +++
 .../formats/protobuf/SimpleProtoToRowTest.java | 45 +--
 .../formats/protobuf/SimpleRowToProtoTest.java | 49 ++---
 .../protobuf/TimestampMultiProtoToRowTest.java | 46 
 .../protobuf/TimestampMultiRowToProtoTest.java}| 32 ++-
 .../protobuf/TimestampNoMultiProtoToRowTest.java   | 47 
 .../protobuf/TimestampNoMultiRowToProtoTest.java   | 44 +++
 .../TimestampOuterMultiProtoToRowTest.java | 49 +
 .../TimestampOuterMultiRowToProtoTest.java | 44 +++
 .../TimestampOuterNoMultiProtoToRowTest.java   | 47 
 .../TimestampOuterNoMultiRowToProtoTest.java   | 47 
 .../flink-protobuf/src/test/proto/test_map.proto   | 18 +++---
 .../test/proto/test_multiple_level_message.proto   | 26 -
 .../flink-protobuf/src/test/proto/test_null.proto  | 64 +++---
 .../flink-protobuf/src/test/proto/test_oneof.proto |  8 +--
 .../flink-protobuf/src/test/proto/test_pb3.proto   | 48 
 .../src/test/proto/test_repeated.proto | 13 ++---
 .../src/test/proto/test_repeated_message.proto | 12 ++--
 ...neof.proto => test_same_outer_class_name.proto} | 16 +++---
 .../{test_simple.proto => test_simple_multi.proto} | 51 +
 .../test/proto/test_simple_no_java_package.proto   | 40 +++---
 ...ter_nomulti.proto => test_simple_nomulti.proto} | 30 ++
 .../src/test/proto/test_simple_outer_multi.proto   | 23 
 .../src/test/proto/test_simple_outer_nomulti.proto | 26 -
 ...test_oneof.proto => test_timestamp_multi.proto} | 10 ++--
 ...st_oneof.proto => test_timestamp_nomulti.proto} | 12 ++--
 ...sage.proto => test_timestamp_outer_multi.proto} | 14 ++---
 ...ge.proto => test_timestamp_outer_nomulti.proto} | 16 ++
 46 files changed, 806 insertions(+), 352 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md 
b/docs/content.zh/docs/connectors/table/formats/protobuf.md
index 5cbafc8d911..b28cf49f9d3 100644
--- a/docs/content.zh/docs/connectors/table/formats/protobuf.md
+++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md
@@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type 
to Protobuf type.
   enum
   The enum value of protobuf can be mapped to string or number of 
flink row accordingly.
 
+
+  ROW<seconds BIGINT, nanos INT>
+  google.protobuf.timestamp
+  The google.protobuf.timestamp type can be mapped to seconds and 
fractions of seconds at nanosecond resolution in UTC epoch time using the row 
type as well as the protobuf definition.
+
 
 
 
diff --git a/flink-formats/flink-protobuf/pom.xml 
b/flink-formats/flink-protobuf/pom.xml
index be38a1a4242..da593b8bc4a 100644
--- a/flink-formats/flink-protobuf/pom.xml
+++ b/flink-formats/flink-protobuf/pom.xml
@@ -109,6 +109,7 @@ under the License.



com.google.pr

[flink] branch master updated: [FLINK-29990][sql-parser] Fix unparsed sql for SqlTableLike with no options

2022-11-11 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new dd768443424 [FLINK-29990][sql-parser] Fix unparsed sql for 
SqlTableLike with no options
dd768443424 is described below

commit dd76844342489a252f8b76417090f137028af0bc
Author: chengshuo.cs 
AuthorDate: Fri Nov 11 15:21:24 2022 +0800

[FLINK-29990][sql-parser] Fix unparsed sql for SqlTableLike with no options
---
 .../org/apache/flink/sql/parser/ddl/SqlTableLike.java |  3 +++
 .../flink/sql/parser/FlinkSqlParserImplTest.java  | 19 +++
 2 files changed, 22 insertions(+)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java
index 634eb1b0b55..eb1651cc2e0 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java
@@ -223,6 +223,9 @@ public class SqlTableLike extends SqlCall implements 
ExtendedSqlNode {
 public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 writer.keyword("LIKE");
 sourceTable.unparse(writer, leftPrec, rightPrec);
+if (options == null || options.isEmpty()) {
+return;
+}
 SqlWriter.Frame frame = writer.startList("(", ")");
 for (SqlTableLikeOption option : options) {
 writer.newlineAndIndent();
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 741afa6d010..2dd968993f0 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1078,6 +1078,25 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 sql(sql).fails("(?s).*Encountered \"a\" at line 6, column 3.\n.*");
 }
 
+@Test
+void testCreateTableLikeWithoutOption() {
+final String sql =
+"create table source_table(\n"
++ "  a int,\n"
++ "  b bigint,\n"
++ "  c string\n"
++ ")\n"
++ "LIKE parent_table";
+final String expected =
+"CREATE TABLE `SOURCE_TABLE` (\n"
++ "  `A` INTEGER,\n"
++ "  `B` BIGINT,\n"
++ "  `C` STRING\n"
++ ")\n"
++ "LIKE `PARENT_TABLE`";
+sql(sql).ok(expected);
+}
+
 @Test
 void testCreateTableWithLikeClause() {
 final String sql =



[flink] branch master updated: [FLINK-16024][connector/jdbc] Support filter pushdown

2022-11-02 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 496700197b9 [FLINK-16024][connector/jdbc] Support filter pushdown
496700197b9 is described below

commit 496700197b9e2ca60f6cdd3eb01fa4a12227548a
Author: Qing Lim 
AuthorDate: Wed Jun 22 12:55:20 2022 +0100

[FLINK-16024][connector/jdbc] Support filter pushdown

This closes #20140
---
 .../CompositeJdbcParameterValuesProvider.java  |  58 
 .../jdbc/table/JdbcDynamicTableSource.java | 113 +++-
 ...JdbcFilterPushdownPreparedStatementVisitor.java | 201 ++
 .../jdbc/table/ParameterizedPredicate.java |  59 
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 140 ++
 ...FilterPushdownPreparedStatementVisitorTest.java | 297 +
 .../connector/jdbc/table/JdbcTablePlanTest.java|   6 +
 .../connector/jdbc/table/JdbcTablePlanTest.xml |  17 ++
 8 files changed, 880 insertions(+), 11 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
new file mode 100644
index 000..eebeac5abe0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/** Combine 2 {@link JdbcParameterValuesProvider} into 1. */
+@Internal
+public class CompositeJdbcParameterValuesProvider implements 
JdbcParameterValuesProvider {
+JdbcParameterValuesProvider a;
+JdbcParameterValuesProvider b;
+
+public CompositeJdbcParameterValuesProvider(
+JdbcParameterValuesProvider a, JdbcParameterValuesProvider b) {
+Preconditions.checkArgument(
+a.getParameterValues().length == b.getParameterValues().length,
+"Both JdbcParameterValuesProvider should have the same 
length.");
+this.a = a;
+this.b = b;
+}
+
+@Override
+public Serializable[][] getParameterValues() {
+int batchNum = this.a.getParameterValues().length;
+Serializable[][] parameters = new Serializable[batchNum][];
+for (int i = 0; i < batchNum; i++) {
+Serializable[] aSlice = a.getParameterValues()[i];
+Serializable[] bSlice = b.getParameterValues()[i];
+int totalLen = aSlice.length + bSlice.length;
+
+Serializable[] batchParams = new Serializable[totalLen];
+
+System.arraycopy(aSlice, 0, batchParams, 0, aSlice.length);
+System.arraycopy(bSlice, 0, batchParams, aSlice.length, 
bSlice.length);
+parameters[i] = batchParams;
+}
+return parameters;
+}
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 0bc7ae04792..e27cf63df50 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -22,25 +22,41 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import 
org.apache.flink.connector.jdbc.split.CompositeJdb

[flink] branch master updated: [FLINK-28765][docs] Add documentation for protobuf format

2022-08-02 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 92ec61a54df [FLINK-28765][docs] Add documentation for protobuf format
92ec61a54df is described below

commit 92ec61a54df2a66183633c57944eac774dd48906
Author: Suhan Mao 
AuthorDate: Mon Aug 1 18:37:35 2022 +0800

[FLINK-28765][docs] Add documentation for protobuf format

This closes #20408
---
 .../docs/connectors/table/formats/protobuf.md  | 286 +
 .../docs/connectors/table/formats/protobuf.md  | 286 +
 docs/data/sql_connectors.yml   |   6 +
 3 files changed, 578 insertions(+)

diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md 
b/docs/content.zh/docs/connectors/table/formats/protobuf.md
new file mode 100644
index 000..5cbafc8d911
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md
@@ -0,0 +1,286 @@
+---
+title: Protobuf
+weight: 4
+type: docs
+aliases:
+- /dev/table/connectors/formats/protobuf.html
+---
+
+
+# Protobuf Format
+
+{{< label "Format: Serialization Schema" >}}
+{{< label "Format: Deserialization Schema" >}}
+
+The Protocol Buffers 
[Protobuf](https://developers.google.com/protocol-buffers) format allows you to 
read and write Protobuf data, based on Protobuf generated classes.
+
+Dependencies
+
+
+{{< sql_download_table "protobuf" >}}
+
+How to create a table with Protobuf format
+
+
+Here is an example to create a table using the Kafka connector and Protobuf 
format.
+
+Below is the proto definition file.
+
+```
+syntax = "proto2";
+package com.example;
+option java_package = "com.example";
+option java_multiple_files = true;
+
+message SimpleTest {
+optional int64 uid = 1;
+optional string name = 2;
+optional int32 category_type = 3;
+optional bytes content = 4;
+optional double price = 5;
+map value_map = 6;
+repeated  InnerMessageTest value_arr = 7;
+optional Corpus corpus_int = 8; 
+optional Corpus corpus_str = 9; 
+
+message InnerMessageTest{
+  optional int64 v1 =1;
+  optional int32 v2 =2;
+}
+
+enum Corpus {
+UNIVERSAL = 0;
+WEB = 1;
+IMAGES = 2;
+LOCAL = 3;
+NEWS = 4;
+PRODUCTS = 5;
+VIDEO = 7;
+  }
+}
+```
+
+1. Use 
[`protoc`](https://developers.google.com/protocol-buffers/docs/javatutorial#compiling-your-protocol-buffers)
 command to compile the `.proto` file to java classes
+2. Then compile and package the classes (there is no need to package 
proto-java into the jar)
+3. Finally you should provide the `jar` in your classpath, e.g. pass it using 
`-j` in }}">sql-client
+
+```sql
+CREATE TABLE simple_test (
+  uid BIGINT,
+  name STRING,
+  category_type INT,
+  content BINARY,
+  price DOUBLE,
+  value_map map>,
+  value_arr array>,
+  corpus_int INT,
+  corpus_str STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'user_behavior',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'format' = 'protobuf',
+ 'protobuf.message-class-name' = 'com.example.SimpleTest',
+ 'protobuf.ignore-parse-errors' = 'true'
+)
+```
+
+Format Options
+
+
+
+
+  
+Option
+Required
+Forwarded
+Default
+Type
+Description
+  
+
+
+
+  format
+  required
+  no
+  (none)
+  String
+  Specify what format to use, here should be 
'protobuf'.
+
+
+  protobuf.message-class-name
+  required
+  no
+  (none)
+  String
+  The full name of a Protobuf generated class. The name must match the 
message name in the proto definition file. $ is supported for 
inner class names, like 'com.exmample.OuterClass$MessageClass'
+
+
+  protobuf.ignore-parse-errors
+  optional
+  no
+  false
+  Boolean
+  Optional flag to skip rows with parse errors instead of failing.
+
+
+  protobuf.read-default-values
+  optional
+  yes
+  false
+  Boolean
+  
+  This option only works if the generated class's version is proto2. 
If this value is set to true, the format will read empty values as the default 
values defined in the proto file.
+  If the value is set to false, the format will generate null values 
if the data element does not exist in the binary protobuf message.
+  If the proto syntax is proto3, this value will forcibly be set to 
true, because proto3's standard i

[flink] branch master updated (be4e0fe050b -> 5c87b69b530)

2022-07-27 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from be4e0fe050b [FLINK-28577][web] Fix the null error of reading 
checkpointed_size in checkpoint tab
 add 5c87b69b530 [FLINK-18202][protobuf] Introduce protobuf format

No new revisions were added by this update.

Summary of changes:
 flink-formats/flink-protobuf/pom.xml   | 146 +
 .../com/google/protobuf/ProtobufInternalUtils.java |  27 ++
 .../flink/formats/protobuf/PbCodegenException.java |  36 +++
 .../apache/flink/formats/protobuf/PbConstant.java  |  29 ++
 .../flink/formats/protobuf/PbDecodingFormat.java   |  52 
 .../flink/formats/protobuf/PbEncodingFormat.java   |  49 +++
 .../flink/formats/protobuf/PbFormatConfig.java | 120 
 .../flink/formats/protobuf/PbFormatContext.java|  38 +++
 .../flink/formats/protobuf/PbFormatFactory.java|  94 ++
 .../flink/formats/protobuf/PbFormatOptions.java|  55 
 .../deserialize/PbCodegenArrayDeserializer.java|  87 ++
 .../deserialize/PbCodegenDeserializeFactory.java   |  56 
 .../deserialize/PbCodegenDeserializer.java |  39 +++
 .../deserialize/PbCodegenMapDeserializer.java  | 115 +++
 .../deserialize/PbCodegenRowDeserializer.java  | 126 
 .../deserialize/PbCodegenSimpleDeserializer.java   |  86 ++
 .../PbRowDataDeserializationSchema.java| 107 +++
 .../protobuf/deserialize/ProtoToRowConverter.java  | 134 +
 .../serialize/PbCodegenArraySerializer.java|  89 ++
 .../protobuf/serialize/PbCodegenMapSerializer.java | 126 
 .../protobuf/serialize/PbCodegenRowSerializer.java | 128 
 .../serialize/PbCodegenSerializeFactory.java   |  54 
 .../protobuf/serialize/PbCodegenSerializer.java|  40 +++
 .../serialize/PbCodegenSimpleSerializer.java   | 121 
 .../serialize/PbRowDataSerializationSchema.java|  66 
 .../protobuf/serialize/RowToProtoConverter.java| 114 +++
 .../formats/protobuf/util/PbCodegenAppender.java   |  85 ++
 .../formats/protobuf/util/PbCodegenUtils.java  | 270 +
 .../formats/protobuf/util/PbCodegenVarId.java  |  40 +++
 .../flink/formats/protobuf/util/PbFormatUtils.java | 105 +++
 .../protobuf/util/PbSchemaValidationUtils.java | 167 +++
 .../formats/protobuf/util/PbToRowTypeUtil.java | 110 +++
 .../org.apache.flink.table.factories.Factory   |  15 +
 .../flink/formats/protobuf/MapProtoToRowTest.java  |  64 
 .../flink/formats/protobuf/MapRowToProtoTest.java  |  71 +
 .../protobuf/MetaNoOuterNoMultiProtoToRowTest.java |  77 +
 .../flink/formats/protobuf/MetaOuterMultiTest.java |  56 
 .../formats/protobuf/MetaOuterNoMultiTest.java |  61 
 .../protobuf/MultiLevelMessageProtoToRowTest.java  |  58 
 .../protobuf/MultiLevelMessageRowToProtoTest.java  |  58 
 .../protobuf/NoJavaPackageProtoToRowTest.java  |  32 ++
 .../formats/protobuf/NullValueToProtoTest.java | 219 ++
 .../formats/protobuf/OneofProtoToRowTest.java  |  38 +++
 .../formats/protobuf/OneofRowToProtoTest.java  |  41 +++
 .../flink/formats/protobuf/Pb3ToRowTest.java   | 125 
 .../formats/protobuf/ProtobufSQLITCaseTest.java| 331 +
 .../flink/formats/protobuf/ProtobufTestHelper.java | 132 
 .../protobuf/RepeatedMessageProtoToRowTest.java|  57 
 .../protobuf/RepeatedMessageRowToProtoTest.java|  50 
 .../formats/protobuf/RepeatedProtoToRowTest.java   |  43 +++
 .../formats/protobuf/RepeatedRowToProtoTest.java   |  75 +
 .../formats/protobuf/SimpleProtoToRowTest.java | 117 
 .../formats/protobuf/SimpleRowToProtoTest.java |  96 ++
 .../protobuf/table/TestProtobufSinkFunction.java   |  45 +++
 .../protobuf/table/TestProtobufSourceFunction.java |  52 
 .../protobuf/table/TestProtobufTableFactory.java   |  84 ++
 .../protobuf/table/TestProtobufTableSink.java  |  61 
 .../protobuf/table/TestProtobufTableSource.java|  64 
 .../protobuf/table/TestProtobufTestStore.java  |  28 ++
 .../flink-protobuf/src/test/proto/test_map.proto   |  37 +++
 .../test/proto/test_multiple_level_message.proto   |  42 +++
 .../flink-protobuf/src/test/proto/test_null.proto  |  61 
 .../flink-protobuf/src/test/proto/test_oneof.proto |  29 ++
 .../flink-protobuf/src/test/proto/test_pb3.proto   |  53 
 .../src/test/proto/test_repeated.proto |  32 ++
 .../src/test/proto/test_repeated_message.proto |  32 ++
 .../src/test/proto/test_simple.proto   |  49 +++
 .../test/proto/test_simple_no_java_package.proto   |  46 +++
 .../test/proto/test_simple_noouter_nomulti.proto   |  38 +++
 .../src/test/proto/test_simple_outer_multi.proto   |  40 +++
 .../src/test/proto/test_simple_outer_nomulti.proto |  39

[flink-web] branch asf-site updated: Add libenchao to the committer list

2022-01-26 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 9f3e4ee  Add libenchao to the committer list
9f3e4ee is described below

commit 9f3e4ee2506e73f2551adb0fba61d5cc70817d5e
Author: Benchao Li 
AuthorDate: Wed Jan 26 17:56:32 2022 +0800

Add libenchao to the committer list
---
 community.md| 7 ++-
 community.zh.md | 6 ++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/community.md b/community.md
index af5af0a..d8f0b87 100644
--- a/community.md
+++ b/community.md
@@ -562,7 +562,12 @@ Flink Forward is a conference happening yearly in 
different locations around the
 PMC, Committer
 igalshilman
   
-
+  
+https://avatars1.githubusercontent.com/u/4471524?s=50"; 
class="committer-avatar">
+Benchao Li
+Committer
+libenchao
+  
 
 
 You can reach committers directly at `@apache.org`. A list of all 
contributors can be found [here]({{ site.FLINK_CONTRIBUTORS_URL }}).
diff --git a/community.zh.md b/community.zh.md
index e563079..cefec26 100644
--- a/community.zh.md
+++ b/community.zh.md
@@ -547,6 +547,12 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最
 PMC, Committer
 zhuzh
   
+  
+https://avatars1.githubusercontent.com/u/4471524?s=50"; 
class="committer-avatar">
+Benchao Li
+Committer
+libenchao
+  
 
 
 可以通过 `@apache.org` 直接联系 committer。可以在 [这里]({{ 
site.FLINK_CONTRIBUTORS_URL }}) 找到所有的贡献者。


[flink] branch release-1.11 updated: [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new b50fc62  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters
b50fc62 is described below

commit b50fc62e5b08363932815e0c68fbea3f08f0011f
Author: Benchao Li 
AuthorDate: Wed Nov 4 21:33:23 2020 +0800

[FLINK-19790][json] Clear reused ObjectNode's content for map converter in 
RowDataToJsonConverters

This closes 13926
---
 .../json/JsonRowDataSerializationSchema.java   |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 89b3b87..afc8ede 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -268,6 +268,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema

[flink] branch release-1.11 updated (f488e70 -> b50fc62)

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f488e70  [FLINK-19867][table-common] Validation fails for UDF that 
accepts var-args
 add b50fc62  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters

No new revisions were added by this update.

Summary of changes:
 .../json/JsonRowDataSerializationSchema.java   |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated (cddb821 -> b7487bd)

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cddb821  [FLINK-19844][python][docs] Add documentation for Python UDAF
 add b7487bd  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters

No new revisions were added by this update.

Summary of changes:
 .../formats/json/RowDataToJsonConverters.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated (cddb821 -> b7487bd)

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cddb821  [FLINK-19844][python][docs] Add documentation for Python UDAF
 add b7487bd  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters

No new revisions were added by this update.

Summary of changes:
 .../formats/json/RowDataToJsonConverters.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated (cddb821 -> b7487bd)

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cddb821  [FLINK-19844][python][docs] Add documentation for Python UDAF
 add b7487bd  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters

No new revisions were added by this update.

Summary of changes:
 .../formats/json/RowDataToJsonConverters.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated (cddb821 -> b7487bd)

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cddb821  [FLINK-19844][python][docs] Add documentation for Python UDAF
 add b7487bd  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters

No new revisions were added by this update.

Summary of changes:
 .../formats/json/RowDataToJsonConverters.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)



[flink] branch master updated: [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters

2020-11-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b7487bd  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters
b7487bd is described below

commit b7487bd85bedea8bd50e706c253a61ebbcf8a8bc
Author: Benchao Li 
AuthorDate: Fri Oct 23 23:03:44 2020 +0800

[FLINK-19790][json] Clear reused ObjectNode's content for map converter in 
RowDataToJsonConverters

This closes #13777
---
 .../formats/json/RowDataToJsonConverters.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index e7530f1..132efc5 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -242,6 +242,7 @@ public class RowDataToJsonConverters implements 
Serializable {
node = mapper.createObjectNode();
} else {
node = (ObjectNode) reuse;
+   node.removeAll();
}
 
MapData map = (MapData) object;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index e2a4762..bba9cc1 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -246,7 +246,12 @@ public class JsonRowDataSerDeSchemaTest {
RowType rowType = (RowType) ROW(
FIELD("f1", INT()),
FIELD("f2", BOOLEAN()),
-   FIELD("f3", STRING())
+   FIELD("f3", STRING()),
+   FIELD("f4", MAP(STRING(), STRING())),
+   FIELD("f5", ARRAY(STRING())),
+   FIELD("f6", ROW(
+   FIELD("f1", STRING()),
+   FIELD("f2", INT(
).getLogicalType();
 
JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
@@ -261,6 +266,14 @@ public class JsonRowDataSerDeSchemaTest {
root.put("f1", 1);
root.put("f2", true);
root.put("f3", "str");
+   ObjectNode map = root.putObject("f4");
+   map.put("hello1", "flink");
+   ArrayNode array = root.putArray("f5");
+   array.add("element1");
+   array.add("element2");
+   ObjectNode row = root.putObject("f6");
+   row.put("f1", "this is row1");
+   row.put("f2", 12);
byte[] serializedJson = 
objectMapper.writeValueAsBytes(root);
RowData rowData = 
deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
@@ -273,6 +286,14 @@ public class JsonRowDataSerDeSchemaTest {
root.put("f1", 10);
root.put("f2", false);
root.put("f3", "newStr");
+   ObjectNode map = root.putObject("f4");
+   map.put("hello2", "json");
+   ArrayNode array = root.putArray("f5");
+   array.add("element3");
+   array.add("element4");
+   ObjectNode row = root.putObject("f6");
+   row.put("f1", "this is row2");
+   row.putNull("f2");
byte[] serializedJson = 
objectMapper.writeValueAsBytes(root);
RowData rowData = 
deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);



[flink] branch master updated (a8c7b50 -> b5b3065)

2020-10-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a8c7b50  [FLINK-19590] Fix the compile issue of flink-streaming-java 
module
 add 29a5135  [FLINK-19336][table] EncodingUtils#encodeObjectToString 
should propagate inner exception
 add b5b3065  [FLINK-18850][table-runtime-blink] Add late records dropped 
metric for row time over windows

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/utils/EncodingUtils.java|  2 +-
 .../AbstractRowTimeUnboundedPrecedingOver.java | 20 ++-
 .../over/RowTimeRangeBoundedPrecedingFunction.java | 18 +++
 .../over/RowTimeRowsBoundedPrecedingFunction.java  | 18 +++
 ...ionTest.java => RowTimeOverWindowTestBase.java} | 57 
 .../RowTimeRangeBoundedPrecedingFunctionTest.java  | 55 +--
 ...RowTimeRangeUnboundedPrecedingFunctionTest.java | 61 +
 .../RowTimeRowsBoundedPrecedingFunctionTest.java   | 62 ++
 .../RowTimeRowsUnboundedPrecedingFunctionTest.java | 61 +
 9 files changed, 275 insertions(+), 79 deletions(-)
 copy 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/{ProcTimeRangeBoundedPrecedingFunctionTest.java
 => RowTimeOverWindowTestBase.java} (50%)
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeUnboundedPrecedingFunctionTest.java
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunctionTest.java
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsUnboundedPrecedingFunctionTest.java



[flink] branch master updated (a8c7b50 -> b5b3065)

2020-10-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a8c7b50  [FLINK-19590] Fix the compile issue of flink-streaming-java 
module
 add 29a5135  [FLINK-19336][table] EncodingUtils#encodeObjectToString 
should propagate inner exception
 add b5b3065  [FLINK-18850][table-runtime-blink] Add late records dropped 
metric for row time over windows

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/utils/EncodingUtils.java|  2 +-
 .../AbstractRowTimeUnboundedPrecedingOver.java | 20 ++-
 .../over/RowTimeRangeBoundedPrecedingFunction.java | 18 +++
 .../over/RowTimeRowsBoundedPrecedingFunction.java  | 18 +++
 ...ionTest.java => RowTimeOverWindowTestBase.java} | 57 
 .../RowTimeRangeBoundedPrecedingFunctionTest.java  | 55 +--
 ...RowTimeRangeUnboundedPrecedingFunctionTest.java | 61 +
 .../RowTimeRowsBoundedPrecedingFunctionTest.java   | 62 ++
 .../RowTimeRowsUnboundedPrecedingFunctionTest.java | 61 +
 9 files changed, 275 insertions(+), 79 deletions(-)
 copy 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/{ProcTimeRangeBoundedPrecedingFunctionTest.java
 => RowTimeOverWindowTestBase.java} (50%)
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeUnboundedPrecedingFunctionTest.java
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunctionTest.java
 create mode 100644 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsUnboundedPrecedingFunctionTest.java



[flink] branch master updated (798c004 -> 81598d4)

2020-09-29 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 798c004  [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port
 add 81598d4  [hotfix][table-planner-blink] Improve exception of item 
operator

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated (798c004 -> 81598d4)

2020-09-29 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 798c004  [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port
 add 81598d4  [hotfix][table-planner-blink] Improve exception of item 
operator

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated (798c004 -> 81598d4)

2020-09-29 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 798c004  [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port
 add 81598d4  [hotfix][table-planner-blink] Improve exception of item 
operator

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch release-1.11 updated: [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path

2020-09-22 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 383991a  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path
383991a is described below

commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1
Author: zhushang 
AuthorDate: Sun Sep 20 22:46:14 2020 +0800

[FLINK-19281][table-planner-blink] LIKE cannot recognize full table path

This closes #13431
---
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2530897..cf8f62b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,8 +140,7 @@ class SqlCreateTableConverter {
}
 
private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
-   UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlTableLike.getSourceTable()
-   .toString());
+   UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
CatalogManager.TableLookupResult lookupResult = 
catalogManager.getTable(identifier)
.orElseThrow(() -> new 
ValidationException(String.format(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 1c582a3..2d5faad 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -529,6 +529,39 @@ public class SqlToOperationConverterTest {
}
 
@Test
+   public void testCreateTableLikeWithFullPath(){
+   Map sourceProperties = new HashMap<>();
+   sourceProperties.put("connector.type", "kafka");
+   sourceProperties.put("format.type", "json");
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
+   TableSchema.builder()
+   .field("f0", DataTypes.INT().notNull())
+   .field("f1", DataTypes.TIMESTAMP(3))
+   .build(),
+   sourceProperties,
+   null
+   );
+   catalogManager.createTable(catalogTable, 
ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
+   final String sql = "create table mytable like 
`builtin`.`default`.sourceTable";
+   Operation operation = parseAndConvert(sql);
+
+   assertThat(
+   operation,
+   isCreateTableOperation(
+   withSchema(
+   TableSchema.builder()
+   .field("f0", DataTypes.INT().notNull())
+   .field("f1", DataTypes.TIMESTAMP(3))
+   .build()
+   ),
+   withOptions(
+   entry("connector.type", "kafka"),
+   entry("format.type", "json")
+   )
+   ));
+   }
+
+   @Test
public void testMergingCreateTableLike() {
Map sourceProperties = new HashMap<>();
sourceProperties.put("format.type", "json");



[flink] branch master updated (ac776cb -> d0226d4)

2020-09-22 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ac776cb  [FLINK-17910][e2e] Fix debug log output to investigate rare 
test failure
 add d0226d4  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)



[flink] branch master updated (ac776cb -> d0226d4)

2020-09-22 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ac776cb  [FLINK-17910][e2e] Fix debug log output to investigate rare 
test failure
 add d0226d4  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)



[flink] branch master updated (ac776cb -> d0226d4)

2020-09-21 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ac776cb  [FLINK-17910][e2e] Fix debug log output to investigate rare 
test failure
 add d0226d4  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)



[flink] branch master updated (ac776cb -> d0226d4)

2020-09-21 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ac776cb  [FLINK-17910][e2e] Fix debug log output to investigate rare 
test failure
 add d0226d4  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)



[flink] branch master updated (ac776cb -> d0226d4)

2020-09-21 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ac776cb  [FLINK-17910][e2e] Fix debug log output to investigate rare 
test failure
 add d0226d4  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path

No new revisions were added by this update.

Summary of changes:
 .../operations/SqlCreateTableConverter.java|  3 +-
 .../operations/SqlToOperationConverterTest.java| 33 ++
 2 files changed, 34 insertions(+), 2 deletions(-)



[flink] branch master updated (378115f -> b5a459c)

2020-08-28 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 378115f  [FLINK-19012][task] Check state of AsyncCheckpointRunnable 
before throwing an exception
 add b5a459c  [FLINK-19050][Documentation] Doc of MAX_DECIMAL_PRECISION 
should be DECIMAL

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/connector/jdbc/dialect/PostgresDialect.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated (378115f -> b5a459c)

2020-08-28 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 378115f  [FLINK-19012][task] Check state of AsyncCheckpointRunnable 
before throwing an exception
 add b5a459c  [FLINK-19050][Documentation] Doc of MAX_DECIMAL_PRECISION 
should be DECIMAL

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/connector/jdbc/dialect/PostgresDialect.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch release-1.11 updated (4efb3b7 -> 076a474)

2020-07-23 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 4efb3b7  [FLINK-18666][build] Update japicmp configuration for 1.11.1
 add 076a474  [FLINK-16827][table-planner-blink] StreamExecTemporalSort 
should require a distribution trait in StreamExecTemporalSortRule.

No new revisions were added by this update.

Summary of changes:
 .../stream/StreamExecTemporalSortRule.scala| 15 ++--
 .../table/planner/plan/stream/sql/SortTest.xml |  6 ++-
 .../runtime/stream/sql/TemporalSortITCase.scala| 43 ++
 3 files changed, 58 insertions(+), 6 deletions(-)



[flink] branch release-1.11 updated: [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 39b4778  [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE 
when operand's resultTerm is null
39b4778 is described below

commit 39b4778a3eaae81902203d558335ed4fbbd8e379
Author: libenchao 
AuthorDate: Thu Feb 20 22:19:17 2020 +0800

[FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's 
resultTerm is null

This closes #11161
---
 .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +---
 .../table/planner/expressions/ScalarOperatorsTest.scala  |  1 +
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
index 532f8fe..fcc62c3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import 
org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
+import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{primitiveDefaultValue, 
primitiveTypeTermForType}
 import org.apache.flink.table.planner.codegen.{CodeGenUtils, 
CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.types.logical.LogicalType
 
@@ -44,6 +44,7 @@ class IfCallGen() extends CallGenerator {
 }
 
 val resultTypeTerm = primitiveTypeTermForType(returnType)
+val resultDefault = primitiveDefaultValue(returnType)
 val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
   (resultTypeTerm, "result"),
   ("boolean", "isNull"))
@@ -51,13 +52,18 @@ class IfCallGen() extends CallGenerator {
 val resultCode =
   s"""
  |${operands.head.code}
+ |$resultTerm = $resultDefault;
  |if (${operands.head.resultTerm}) {
  |  ${operands(1).code}
- |  $resultTerm = $castedResultTerm1;
+ |  if (!${operands(1).nullTerm}) {
+ |$resultTerm = $castedResultTerm1;
+ |  }
  |  $nullTerm = ${operands(1).nullTerm};
  |} else {
  |  ${operands(2).code}
- |  $resultTerm = $castedResultTerm2;
+ |  if (!${operands(2).nullTerm}) {
+ |$resultTerm = $castedResultTerm2;
+ |  }
  |  $nullTerm = ${operands(2).nullTerm};
  |}
""".stripMargin
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index 04b2002..4f6cbe4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -124,5 +124,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", 
"true")
 
 testSqlApi("CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END", "null")
+testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null")
   }
 }



[flink] branch master updated (07e43c9 -> d677f02)

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 07e43c9  [hotfix][docs] Add 1.11 to the list of previous docs
 add d677f02  [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE 
when operand's resultTerm is null

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +---
 .../table/planner/expressions/ScalarOperatorsTest.scala  |  1 +
 2 files changed, 10 insertions(+), 3 deletions(-)



[flink] branch master updated: [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d677f02  [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE 
when operand's resultTerm is null
d677f02 is described below

commit d677f02853ed7395730c6282cd33bb0f5bbf4bd3
Author: libenchao 
AuthorDate: Thu Feb 20 22:19:17 2020 +0800

[FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's 
resultTerm is null

This closes #11161
---
 .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +---
 .../table/planner/expressions/ScalarOperatorsTest.scala  |  1 +
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
index 532f8fe..fcc62c3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.codegen.calls
 
-import 
org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType
+import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{primitiveDefaultValue, 
primitiveTypeTermForType}
 import org.apache.flink.table.planner.codegen.{CodeGenUtils, 
CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.types.logical.LogicalType
 
@@ -44,6 +44,7 @@ class IfCallGen() extends CallGenerator {
 }
 
 val resultTypeTerm = primitiveTypeTermForType(returnType)
+val resultDefault = primitiveDefaultValue(returnType)
 val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables(
   (resultTypeTerm, "result"),
   ("boolean", "isNull"))
@@ -51,13 +52,18 @@ class IfCallGen() extends CallGenerator {
 val resultCode =
   s"""
  |${operands.head.code}
+ |$resultTerm = $resultDefault;
  |if (${operands.head.resultTerm}) {
  |  ${operands(1).code}
- |  $resultTerm = $castedResultTerm1;
+ |  if (!${operands(1).nullTerm}) {
+ |$resultTerm = $castedResultTerm1;
+ |  }
  |  $nullTerm = ${operands(1).nullTerm};
  |} else {
  |  ${operands(2).code}
- |  $resultTerm = $castedResultTerm2;
+ |  if (!${operands(2).nullTerm}) {
+ |$resultTerm = $castedResultTerm2;
+ |  }
  |  $nullTerm = ${operands(2).nullTerm};
  |}
""".stripMargin
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index 04b2002..4f6cbe4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -124,5 +124,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", 
"true")
 
 testSqlApi("CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END", "null")
+testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null")
   }
 }



[flink] 01/02: [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3865f7b23c656c74b85b0eb5dd8b73a0f5f88b03
Author: libenchao 
AuthorDate: Mon Jun 1 11:58:11 2020 +0800

[FLINK-18002][json] Correct the behavior for ContainerNode as varchar type

This closes #12421
---
 .../json/JsonRowDataDeserializationSchema.java |   6 +-
 .../formats/json/JsonRowDeserializationSchema.java |  10 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 107 ++---
 .../json/JsonRowDeserializationSchemaTest.java |  30 ++
 4 files changed, 114 insertions(+), 39 deletions(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index d66ecce..956ca5b 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -304,7 +304,11 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema
} else if (simpleTypeInfo == Types.BOOLEAN) {
return Optional.of(this::convertToBoolean);
} else if (simpleTypeInfo == Types.STRING) {
-   return Optional.of((mapper, jsonNode) -> 
jsonNode.asText());
+   return Optional.of(this::convertToString);
} else if (simpleTypeInfo == Types.INT) {
return Optional.of(this::convertToInt);
} else if (simpleTypeInfo == Types.LONG) {
@@ -381,6 +381,14 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema
}
}
 
+   private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
+   if (jsonNode.isContainerNode()) {
+   return jsonNode.toString();
+   } else {
+   return jsonNode.asText();
+   }
+   }
+
private boolean convertToBoolean(ObjectMapper mapper, JsonNode 
jsonNode) {
if (jsonNode.isBoolean()) {
// avoid redundant toString and parseBoolean, for 
better performance
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index cab427f..7b561aa 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -30,11 +30,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import org.junit.Rule;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.LocalDate;
@@ -71,9 +69,6 @@ import static org.junit.Assert.assertEquals;
  */
 public class JsonRowDataSerDeSchemaTest {
 
-   @Rule
-   public ExpectedException thrown = ExpectedException.none();
-
@Test
public void testSerDe() throws Exception {
byte tinyint = 'c';
@@ -326,9 +321,13 @@ public class JsonRowDataSerDeSchemaTest {
deserializationSchema = deserializationSchema = new 
JsonRowDataDeserializationSchema(
schema, WrapperTypeInfo.of(schema), true, false, 
TimestampFormat.ISO_8601);
 
-   thrown.expect(IOException.class);
-   thrown.expectMessage("Failed to deserialize JSON 
'{\"id\":123123123}'");
-   deserializationSchema.deserialize(serializedJson);
+   String errorMessage = "Failed to deserialize JSON 
'{\"id\":123123123}'.";
+   try {
+   deserializationSchema.deserialize(serializedJson);
+   Assert.fail("expecting exception message: " + 
errorMessage);
+   } catch (Throwable t) {
+   assertEquals(errorMessage, t.getMessage());
+   }
 
// ignore on parse error
deserializationSchema = new JsonRowDataDeserializationSchema(
@@ -336,12 +335,15 @@ public class JsonRowDataSerDeSchemaTest {
actual = 
convertToExternal(deserializationSchema.deserialize(serializedJson), dataTyp

[flink] branch master updated (f81f3a0 -> 66353f2)

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f81f3a0  [FLINK-18477][examples-table] Fix packaging of 
ChangelogSocketExample
 new 3865f7b  [FLINK-18002][json] Correct the behavior for ContainerNode as 
varchar type
 new 66353f2  [FLINK-16827][table-planner-blink] StreamExecTemporalSort 
should require a distribution trait in StreamExecTemporalSortRule.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../json/JsonRowDataDeserializationSchema.java |   6 +-
 .../formats/json/JsonRowDeserializationSchema.java |  10 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 107 ++---
 .../json/JsonRowDeserializationSchemaTest.java |  30 ++
 .../stream/StreamExecTemporalSortRule.scala|  15 ++-
 .../table/planner/plan/stream/sql/SortTest.xml |   6 +-
 .../runtime/stream/sql/TemporalSortITCase.scala|  43 +
 7 files changed, 172 insertions(+), 45 deletions(-)



[flink] 02/02: [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.

2020-07-13 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66353f27c4c6481443d1f04a8f23e7f98dd7beda
Author: libenchao 
AuthorDate: Mon Apr 6 16:22:28 2020 +0800

[FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a 
distribution trait in StreamExecTemporalSortRule.

This closes #11643
---
 .../stream/StreamExecTemporalSortRule.scala| 15 ++--
 .../table/planner/plan/stream/sql/SortTest.xml |  6 ++-
 .../runtime/stream/sql/TemporalSortITCase.scala| 43 ++
 3 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
index cf17cc5..3490526 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
@@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
@@ -46,12 +47,18 @@ class StreamExecTemporalSortRule
   override def convert(rel: RelNode): RelNode = {
 val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
 val input = sort.getInput()
-val traitSet: RelTraitSet = 
rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-val convInput: RelNode = RelOptRule.convert(input, 
FlinkConventions.STREAM_PHYSICAL)
+val requiredTraitSet = input.getTraitSet
+  .replace(FlinkRelDistribution.SINGLETON)
+  .replace(FlinkConventions.STREAM_PHYSICAL)
+val providedTraitSet = sort.getTraitSet
+  .replace(FlinkRelDistribution.SINGLETON)
+  .replace(FlinkConventions.STREAM_PHYSICAL)
+
+val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
 
 new StreamExecTemporalSort(
   rel.getCluster,
-  traitSet,
+  providedTraitSet,
   convInput,
   sort.collation)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
index af6e441..a1d191e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
@@ -32,7 +32,8 @@ LogicalProject(a=[$0])
   
 
   
@@ -136,7 +137,8 @@ LogicalProject(a=[$0])
   
 
   
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
index de90d7e..ed90898 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
@@ -79,6 +79,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
   }
 
   @Test
+  def testEventTimeOrderByWithParallelInput(): Unit = {
+val data = List(
+  (3L, 2L, "Hello world", 3),
+  (2L, 2L, "Hello", 2),
+  (6L, 3L, "Luke Skywalker", 6),
+  (5L, 3L, "I am fine.", 5),
+  (7L, 4L, "Comment#1", 7),
+  (9L, 4L, "Comment#3", 9),
+  (10L, 4L, "Comment#4", 10),
+  (8L, 4L, "Comment#2", 8),
+  (1L, 1L, "Hi", 1),
+  (4L, 3L, "Helloworld, how are you?", 4))
+
+val t = failingDataSource(data)
+  .assignTimestampsAndWatermarks(
+new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
+  .setParallelism(env.getParallelism)
+  .toTable(tEn

[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated data type into Chinese

2020-07-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new f7cd463  [FLINK-18324][docs-zh] Translate updated data type into 
Chinese
f7cd463 is described below

commit f7cd46361ec705b8913950b2980c6a24efa8682c
Author: Yubin Li 
AuthorDate: Sun Jun 28 15:32:38 2020 +0800

[FLINK-18324][docs-zh] Translate updated data type into Chinese

This closes #12748
---
 docs/dev/table/types.zh.md | 330 -
 1 file changed, 266 insertions(+), 64 deletions(-)

diff --git a/docs/dev/table/types.zh.md b/docs/dev/table/types.zh.md
index 252ff02..819abc8 100644
--- a/docs/dev/table/types.zh.md
+++ b/docs/dev/table/types.zh.md
@@ -28,7 +28,7 @@ under the License.
 
 从 Flink 1.9 开始,Table & SQL API 开始启用一种新的类型系统作为长期解决方案,用来保持 API 稳定性和 SQL 标准的兼容性。
 
-重新设计类型系统是一项涉及几乎所有的面向用户接口的重大工作。因此,它的引入跨越多个版本,社区的目标是在 Flink 1.10 完成这项工作。
+重新设计类型系统是一项涉及几乎所有的面向用户接口的重大工作。因此,它的引入跨越多个版本,社区的目标是在 Flink 1.12 完成这项工作。
 
 同时由于为 Table 编程添加了新的 Planner 
详见([FLINK-11439](https://issues.apache.org/jira/browse/FLINK-11439)), 并不是每种 
Planner 都支持所有的数据类型。此外,Planner 对于数据类型的精度和参数化支持也可能是不完整的。
 
@@ -112,7 +112,7 @@ DataType t = 
DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(int[].class);
 // 而是使用 java.sql.Timestamp
 val t: DataType = 
DataTypes.TIMESTAMP(3).bridgedTo(classOf[java.sql.Timestamp]);
 
-// 告诉运行时不要产生或者消费装箱的整数数组 
+// 告诉运行时不要产生或者消费装箱的整数数组
 // 而是使用基本数据类型的整数数组
 val t: DataType = 
DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(classOf[Array[Int]]);
 {% endhighlight %}
@@ -184,23 +184,22 @@ Flink 1.9 之前引入的旧的 Planner 主要支持类型信息(Type Informat
 | `DOUBLE` | |
 | `DATE` | |
 | `TIME` | 支持的精度仅为 `0`。 |
-| `TIMESTAMP` | 支持的精度仅为 `3`。 |
-| `TIMESTAMP WITH LOCAL TIME ZONE` | 支持的精度仅为 `3`。 |
+| `TIMESTAMP` | |
+| `TIMESTAMP WITH LOCAL TIME ZONE` | |
 | `INTERVAL` | 仅支持 `MONTH` 和 `SECOND(3)` 区间。 |
 | `ARRAY` | |
 | `MULTISET` | |
 | `MAP` | |
 | `ROW` | |
 | `RAW` | |
+| stuctured types | 暂只能在用户自定义函数里使用。 |
 
 局限性
 ---
 
 **Java 表达式字符串**:Table API 中的 Java 表达式字符串,例如 
`table.select("field.cast(STRING)")`,尚未被更新到新的类型系统中,使用[旧的 Planner 
章节](#旧的-planner)中声明的字符串来表示。
 
-**连接器描述符和 SQL 
客户端**:描述符字符串的表示形式尚未更新到新的类型系统。使用在[连接到外部系统章节](./connect.html#type-strings)中声明的字符串表示。
-
-**用户自定义函数**:用户自定义函数尚不能声明数据类型。
+**用户自定义函数**:用户自定义聚合函数尚不能声明数据类型,标量函数和表函数充分支持数据类型。
 
 数据类型列表
 --
@@ -236,10 +235,11 @@ DataTypes.CHAR(n)
 
 **JVM 类型**
 
-| Java 类型  | 输入 | 输出 | 备注 |
-|:---|:-:|:--:|:|
-|`java.lang.String`  | X | X  | *缺省*   |
-|`byte[]`| X | X  | 假设使用 UTF-8 编码。 |
+| Java 类型   | 输入  | 输出   | 备注 |
+|:|:-:|:--:|:|
+|`java.lang.String`   | X | X  | *缺省*  
 |
+|`byte[]` | X | X  | 假设使用 UTF-8 编码。 |
+|`org.apache.flink.table.data.StringData` | X | X  | 内部数据结构。 |
 
  `VARCHAR` / `STRING`
 
@@ -274,10 +274,11 @@ DataTypes.STRING()
 
 **JVM 类型**
 
-| Java 类型  | 输入 | 输出 | 备注 |
-|:---|:-:|:--:|:|
-|`java.lang.String`  | X | X  | *缺省*   |
-|`byte[]`| X | X  | 假设使用 UTF-8 编码。 |
+| Java 类型   | 输入  | 输出   | 备注 |
+|:|:-:|:--:|:|
+|`java.lang.String`   | X | X  | *缺省*  
 |
+|`byte[]` | X | X  | 假设使用 UTF-8 编码。 |
+|`org.apache.flink.table.data.StringData` | X | X  | 内部数据结构。 |
 
 ### 二进制字符串
 
@@ -389,9 +390,10 @@ DataTypes.DECIMAL(p, s)
 
 **JVM 类型**
 
-| Java 类型 | 输入 | 输出 | 备注 |
-|:--|:-:|:--:|:|
-|`java.math.BigDecimal` | X | X  | *缺省*   |
+| Java 类型| 输入  | 输出   | 备注 |
+|:-|:-:|:--:|:|
+|`java.math.BigDecimal`| X | X  | *缺省* 
  |
+|`org.apache.flink.table.data.DecimalData` | X | X  | 内部数据结构。 |
 
  `TINYINT`
 
@@ -690,12 +692,13 @@ DataTypes.TIMESTAMP(p)
 
 | Java 类型| 输入 | 输出 | 备注
 |
 
|:-|:-:|:--:|:|
-|`java.time.LocalDateTime` | X | X  | *缺省* 
  |
-|`java.sql.Timestamp`   

[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated udf into Chinese

2020-07-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 1917af3  [FLINK-18324][docs-zh] Translate updated udf into Chinese
1917af3 is described below

commit 1917af3a1eba8450c00a7be8c11796e3db16ddb8
Author: Yubin Li 
AuthorDate: Tue Jun 30 19:16:34 2020 +0800

[FLINK-18324][docs-zh] Translate updated udf into Chinese

This closes #12794
---
 docs/dev/table/functions/udfs.zh.md | 1049 ---
 1 file changed, 740 insertions(+), 309 deletions(-)

diff --git a/docs/dev/table/functions/udfs.zh.md 
b/docs/dev/table/functions/udfs.zh.md
index 995a67c..4e3a6be 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -22,322 +22,854 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。
+自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。
+
+自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 
语言开发自定义函数。
 
 * This will be replaced by the TOC
 {:toc}
 
-注册自定义函数

-在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。
+概述
+
 
-通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 
`TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。
+当前 Flink 有如下几种函数:
 
-关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。
+- *标量函数* 将标量值转换成一个新标量值;
+- *表值函数* 将标量值转换成新的行数据;
+- *聚合函数* 将多行数据里的标量值转换成一个新标量值;
+- *表值聚合函数* 将多行数据里的标量值转换成新的行数据;
+- *异步表值函数* 是异步查询外部数据系统的特殊函数。
 
-{% top %}
+注意 标量和表值函数已经使用了新的基于[数据类型]({% link 
dev/table/types.zh.md %})的类型系统,聚合函数仍然使用基于 `TypeInformation` 的旧类型系统。
 
-标量函数
-
+以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。
 
-如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 
个标量值。
+函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 _内联_ 后直接使用。
 
 
-
-想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 
并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 
`eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 
的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。
-
-下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 
里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数:
 
+
 {% highlight java %}
-public class HashCode extends ScalarFunction {
-  private int factor = 12;
-  
-  public HashCode(int factor) {
-  this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// 定义函数逻辑
+public static class SubstringFunction extends ScalarFunction {
+  public String eval(String s, Integer begin, Integer end) {
+return s.substring(begin, end);
   }
-  
-  public int eval(String s) {
-  return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// 在 Table API 里不经注册直接“内联”调用函数
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// 注册函数
+env.createTemporarySystemFunction("SubstringFunction", 
SubstringFunction.class);
+
+// 在 Table API 里调用注册好的函数
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// 在 SQL 里调用注册好的函数
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+  def eval(s: String, begin: Integer, end: Integer): String = {
+s.substring(begin, end)
   }
 }
 
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
+
+// 在 Table API 里不经注册直接“内联”调用函数
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
 
 // 注册函数
-tableEnv.registerFunction("hashCode", new HashCode(10));
+env.createTemporarySystemFunction("SubstringFunction", 
classOf[SubstringFunction])
+
+// 在 Table API 里调用注册好的函数
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
 
-// 在 Java Table API 中使用函数
-myTable.select("string, string.hashCode(), hashCode(string)");
+// 在 SQL 里调用注册好的函数
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
 
-// 在 SQL API 中使用函数
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
 {% endhighlight %}
+
 
-求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 
POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 
`ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。
+
 
-下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 
`ScalarFunction#getResultType

[flink] branch master updated (32f3eeb -> f0eeaec)

2020-07-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 32f3eeb  [FLINK-18526][python][docs] Add documentation for Python UDF 
on how to use managed memory
 add f0eeaec  [FLINK-18324][docs-zh] Translate updated udf into Chinese

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/functions/udfs.zh.md | 1049 ---
 1 file changed, 740 insertions(+), 309 deletions(-)



[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated udf into Chinese

2020-07-12 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 1917af3  [FLINK-18324][docs-zh] Translate updated udf into Chinese
1917af3 is described below

commit 1917af3a1eba8450c00a7be8c11796e3db16ddb8
Author: Yubin Li 
AuthorDate: Tue Jun 30 19:16:34 2020 +0800

[FLINK-18324][docs-zh] Translate updated udf into Chinese

This closes #12794
---
 docs/dev/table/functions/udfs.zh.md | 1049 ---
 1 file changed, 740 insertions(+), 309 deletions(-)

diff --git a/docs/dev/table/functions/udfs.zh.md 
b/docs/dev/table/functions/udfs.zh.md
index 995a67c..4e3a6be 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -22,322 +22,854 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。
+自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。
+
+自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 
语言开发自定义函数。
 
 * This will be replaced by the TOC
 {:toc}
 
-注册自定义函数

-在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。
+概述
+
 
-通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 
`TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。
+当前 Flink 有如下几种函数:
 
-关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。
+- *标量函数* 将标量值转换成一个新标量值;
+- *表值函数* 将标量值转换成新的行数据;
+- *聚合函数* 将多行数据里的标量值转换成一个新标量值;
+- *表值聚合函数* 将多行数据里的标量值转换成新的行数据;
+- *异步表值函数* 是异步查询外部数据系统的特殊函数。
 
-{% top %}
+注意 标量和表值函数已经使用了新的基于[数据类型]({% link 
dev/table/types.zh.md %})的类型系统,聚合函数仍然使用基于 `TypeInformation` 的旧类型系统。
 
-标量函数
-
+以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。
 
-如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 
个标量值。
+函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 _内联_ 后直接使用。
 
 
-
-想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 
并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 
`eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 
的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。
-
-下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 
里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数:
 
+
 {% highlight java %}
-public class HashCode extends ScalarFunction {
-  private int factor = 12;
-  
-  public HashCode(int factor) {
-  this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// 定义函数逻辑
+public static class SubstringFunction extends ScalarFunction {
+  public String eval(String s, Integer begin, Integer end) {
+return s.substring(begin, end);
   }
-  
-  public int eval(String s) {
-  return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// 在 Table API 里不经注册直接“内联”调用函数
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// 注册函数
+env.createTemporarySystemFunction("SubstringFunction", 
SubstringFunction.class);
+
+// 在 Table API 里调用注册好的函数
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// 在 SQL 里调用注册好的函数
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+  def eval(s: String, begin: Integer, end: Integer): String = {
+s.substring(begin, end)
   }
 }
 
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
+
+// 在 Table API 里不经注册直接“内联”调用函数
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
 
 // 注册函数
-tableEnv.registerFunction("hashCode", new HashCode(10));
+env.createTemporarySystemFunction("SubstringFunction", 
classOf[SubstringFunction])
+
+// 在 Table API 里调用注册好的函数
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
 
-// 在 Java Table API 中使用函数
-myTable.select("string, string.hashCode(), hashCode(string)");
+// 在 SQL 里调用注册好的函数
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
 
-// 在 SQL API 中使用函数
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
 {% endhighlight %}
+
 
-求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 
POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 
`ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。
+
 
-下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 
`ScalarFunction#getResultType

[flink] branch master updated (ee65377 -> 1b2d1c7)

2020-07-10 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ee65377  [FLINK-18361][es][table] Support username and password 
options for new Elasticsearch connector
 add 1b2d1c7  [hotfix][doc] Fix temporal_tables correlate with a changing 
dimension table section

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/streaming/temporal_tables.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated (ee65377 -> 1b2d1c7)

2020-07-10 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ee65377  [FLINK-18361][es][table] Support username and password 
options for new Elasticsearch connector
 add 1b2d1c7  [hotfix][doc] Fix temporal_tables correlate with a changing 
dimension table section

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/streaming/temporal_tables.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated: [FLINK-18453][tests] Fix overflow of AggregateITCase#testAggregationCodeSplit

2020-07-09 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e162bd7  [FLINK-18453][tests] Fix overflow of 
AggregateITCase#testAggregationCodeSplit
e162bd7 is described below

commit e162bd7e9064032c0fd5f2035e1a23249f3ca5c2
Author: Benchao Li 
AuthorDate: Thu Jul 9 22:39:14 2020 +0800

[FLINK-18453][tests] Fix overflow of 
AggregateITCase#testAggregationCodeSplit

This closes #12861
---
 .../flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index d3c6b77..c920bdc 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1262,7 +1262,10 @@ class AggregateITCase(
   .toTable(tEnv, 'a, 'b, 'c)
 tEnv.createTemporaryView("MyTable", t)
 
-val columnNumber = 500
+tEnv.getConfig.setMaxGeneratedCodeLength(2048)
+
+// 50 can make sure all generated methods of [Namespace]AggsHandleFunction 
is longer than 2048
+val columnNumber = 50
 
 val selectList = Stream.range(3, columnNumber)
   .map(i => s"SUM(CASE WHEN a IS NOT NULL AND a > $i THEN 0 WHEN a < 0 
THEN 0 ELSE $i END)")



[flink] branch master updated (d0d8037 -> 4b9f9fe)

2020-07-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d0d8037  [FLINK-18470] Ensure rocksdb is loaded in 
RocksKeyGroupsRocksSingleStateIteratorTest
 add 4b9f9fe  [FLINK-18240][table-planner-blink] Allow to use % as modulus 
function

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java  | 2 +-
 .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java | 1 +
 .../org/apache/flink/table/planner/expressions/SqlExpressionTest.scala  | 1 +
 3 files changed, 3 insertions(+), 1 deletion(-)



[flink] branch master updated (d0d8037 -> 4b9f9fe)

2020-07-04 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d0d8037  [FLINK-18470] Ensure rocksdb is loaded in 
RocksKeyGroupsRocksSingleStateIteratorTest
 add 4b9f9fe  [FLINK-18240][table-planner-blink] Allow to use % as modulus 
function

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java  | 2 +-
 .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java | 1 +
 .../org/apache/flink/table/planner/expressions/SqlExpressionTest.scala  | 1 +
 3 files changed, 3 insertions(+), 1 deletion(-)



[flink] branch master updated (02e5977 -> 74502f7)

2020-06-28 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 02e5977  [FLINK-17465][doc-zh] Update translations for memory 
configurations.
 add 74502f7  [FLINK-18324][docs-zh] Translate updated data type into 
Chinese

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/types.zh.md | 334 -
 1 file changed, 268 insertions(+), 66 deletions(-)



[flink] branch release-1.11 updated: [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator

2020-06-19 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 334b7b8  [FLINK-16589][table-planner-blink] Split code for 
AggsHandlerCodeGenerator
334b7b8 is described below

commit 334b7b8b00885135b0682756f970b4e440f0f189
Author: Benchao Li 
AuthorDate: Fri Jun 19 16:15:40 2020 +0800

[FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator

This closes #12710
---
 .../planner/codegen/CodeGeneratorContext.scala | 31 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  | 18 --
 .../table/planner/codegen/GenerateUtils.scala  | 29 ++
 .../planner/codegen/ProjectionCodeGenerator.scala  |  3 +-
 .../codegen/agg/AggsHandlerCodeGenerator.scala | 66 --
 .../planner/codegen/agg/DistinctAggCodeGen.scala   |  3 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  3 +-
 .../runtime/stream/sql/AggregateITCase.scala   | 26 +
 8 files changed, 129 insertions(+), 50 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 5ef90a2..b9bc6fc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{Function, RuntimeContext}
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.table.api.TableConfig
@@ -109,9 +108,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   private var currentMethodNameForLocalVariables = "DEFAULT"
 
   /**
-   * Flag that indicates whether the generated code is split into several 
methods.
+   * Flag map that indicates whether the generated code for method is split 
into several methods.
*/
-  private var isCodeSplit = false
+  private val isCodeSplitMap = mutable.Map[String, Boolean]()
 
   // map of local variable statements. It will be placed in method if method 
code not excess
   // max code length, otherwise will be placed in member area of the class. 
The statements
@@ -149,11 +148,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
-   * Set the flag [[isCodeSplit]] to be true, which indicates the generated 
code is split into
-   * several methods.
+   * Set the flag [[isCodeSplitMap]] to be true for methodName, which indicates
+   * the generated code is split into several methods.
+   * @param methodName the method which will be split.
*/
-  def setCodeSplit(): Unit = {
-isCodeSplit = true
+  def setCodeSplit(methodName: String = currentMethodNameForLocalVariables): 
Unit = {
+isCodeSplitMap(methodName) = true
   }
 
   /**
@@ -210,10 +210,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 */
   def reuseMemberCode(): String = {
 val result = reusableMemberStatements.mkString("\n")
-if (isCodeSplit) {
+if (isCodeSplitMap.nonEmpty) {
   val localVariableAsMember = reusableLocalVariableStatements.map(
-statements => statements._2.map("private " + _).mkString("\n")
-  ).mkString("\n")
+statements => if (isCodeSplitMap.getOrElse(statements._1, false)) {
+  statements._2.map("private " + _).mkString("\n")
+} else {
+  ""
+}
+  ).filter(_.length > 0).mkString("\n")
   result + "\n" + localVariableAsMember
 } else {
   result
@@ -224,8 +228,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 * @return code block of statements that will be placed in the member area 
of the class
 * if generated code is split or in local variables of method
 */
-  def reuseLocalVariableCode(methodName: String = null): String = {
-if (isCodeSplit) {
+  def reuseLocalVariableCode(methodName: String = 
currentMethodNameForLocalVariables): String = {
+if (isCodeSplitMap.getOrElse(methodName, false)) {
   GeneratedExpression.NO_CODE
 } else if (methodName == null) {
   
reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n")
@@ -375,8 +379,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   clazz: Class[_],
   outRecordTerm: String,
   outRecordWriterTerm: Option[String] = None): Unit = {
-val stateme

[flink] branch master updated: [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator

2020-06-18 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 99fca58  [FLINK-16589][table-planner-blink] Split code for 
AggsHandlerCodeGenerator
99fca58 is described below

commit 99fca58fe60199b93f962e5fbf0cb0314b9d3b99
Author: libenchao 
AuthorDate: Wed Mar 25 23:37:44 2020 +0800

[FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator

This closes 11512
---
 .../planner/codegen/CodeGeneratorContext.scala | 31 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  | 18 --
 .../table/planner/codegen/GenerateUtils.scala  | 29 ++
 .../planner/codegen/ProjectionCodeGenerator.scala  |  3 +-
 .../codegen/agg/AggsHandlerCodeGenerator.scala | 66 --
 .../planner/codegen/agg/DistinctAggCodeGen.scala   |  3 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  3 +-
 .../runtime/stream/sql/AggregateITCase.scala   | 26 +
 8 files changed, 129 insertions(+), 50 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 5ef90a2..b9bc6fc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.codegen
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{Function, RuntimeContext}
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.table.api.TableConfig
@@ -109,9 +108,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   private var currentMethodNameForLocalVariables = "DEFAULT"
 
   /**
-   * Flag that indicates whether the generated code is split into several 
methods.
+   * Flag map that indicates whether the generated code for method is split 
into several methods.
*/
-  private var isCodeSplit = false
+  private val isCodeSplitMap = mutable.Map[String, Boolean]()
 
   // map of local variable statements. It will be placed in method if method 
code not excess
   // max code length, otherwise will be placed in member area of the class. 
The statements
@@ -149,11 +148,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
-   * Set the flag [[isCodeSplit]] to be true, which indicates the generated 
code is split into
-   * several methods.
+   * Set the flag [[isCodeSplitMap]] to be true for methodName, which indicates
+   * the generated code is split into several methods.
+   * @param methodName the method which will be split.
*/
-  def setCodeSplit(): Unit = {
-isCodeSplit = true
+  def setCodeSplit(methodName: String = currentMethodNameForLocalVariables): 
Unit = {
+isCodeSplitMap(methodName) = true
   }
 
   /**
@@ -210,10 +210,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 */
   def reuseMemberCode(): String = {
 val result = reusableMemberStatements.mkString("\n")
-if (isCodeSplit) {
+if (isCodeSplitMap.nonEmpty) {
   val localVariableAsMember = reusableLocalVariableStatements.map(
-statements => statements._2.map("private " + _).mkString("\n")
-  ).mkString("\n")
+statements => if (isCodeSplitMap.getOrElse(statements._1, false)) {
+  statements._2.map("private " + _).mkString("\n")
+} else {
+  ""
+}
+  ).filter(_.length > 0).mkString("\n")
   result + "\n" + localVariableAsMember
 } else {
   result
@@ -224,8 +228,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
 * @return code block of statements that will be placed in the member area 
of the class
 * if generated code is split or in local variables of method
 */
-  def reuseLocalVariableCode(methodName: String = null): String = {
-if (isCodeSplit) {
+  def reuseLocalVariableCode(methodName: String = 
currentMethodNameForLocalVariables): String = {
+if (isCodeSplitMap.getOrElse(methodName, false)) {
   GeneratedExpression.NO_CODE
 } else if (methodName == null) {
   
reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n")
@@ -375,8 +379,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   clazz: Class[_],
   outRecordTerm: String,
   outRecordWriterTerm: Option[String] = None): Unit = {
-val statement = generateRecordStatement(t, clazz, 

[flink] branch release-1.11 updated: [FLINK-18282][docs-zh] Retranslate the home page document

2020-06-16 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 2f8aa59  [FLINK-18282][docs-zh] Retranslate the home page document
2f8aa59 is described below

commit 2f8aa5950bc696c9420a892c39cd83bffb8faa4f
Author: wangsong2 
AuthorDate: Sat Jun 13 20:58:45 2020 +0800

[FLINK-18282][docs-zh] Retranslate the home page document

This closes #12642
---
 docs/index.zh.md | 82 ++--
 1 file changed, 50 insertions(+), 32 deletions(-)

diff --git a/docs/index.zh.md b/docs/index.zh.md
index d21ca73..0f83e89 100644
--- a/docs/index.zh.md
+++ b/docs/index.zh.md
@@ -23,53 +23,71 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+
+Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。 Flink 已经可以在所有常见的集群环境中运行,并以 
in-memory 的速度和任意的规模进行计算。
+
 
-本文档适用于 Apache Flink {{ site.version_title}} 版本。本页面最近更新于 {% build_time %}.
+
+
 
-Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心是一个提供数据分发、通信以及自动容错的流计算引擎。Flink 
在流计算之上构建批处理,并且原生的支持迭代计算,内存管理以及程序优化。
+### 试用 Flink
 
-## 初步印象
+如果您有兴趣使用 Flink, 可以试试我们的教程:
 
-* **代码练习**: 跟随分步指南通过 Flink API 实现简单应用或查询。
-  * [实现 DataStream 应用]({% link try-flink/datastream_api.zh.md %})
-  * [书写 Table API 查询]({% link try-flink/table_api.zh.md %})
+* [DataStream API 进行欺诈检测]({% link try-flink/datastream_api.zh.md %})
+* [Table API 构建实时报表]({% link try-flink/table_api.zh.md %})
+* [Python API 教程]({% link try-flink/python_table_api.zh.md %})
+* [Flink 游乐场]({% link try-flink/flink-operations-playground.zh.md %})
 
-* **Docker 游乐场**: 你只需花几分钟搭建 Flink 沙盒环境,就可以探索和使用 Flink 了。
-  * [运行与管理 Flink 流处理应用]({% link try-flink/flink-operations-playground.zh.md %})
+### 学习 Flink
 
-* **概念**: 学习 Flink 的基本概念能更好地理解文档。
-  * [有状态流处理](concepts/stateful-stream-processing.html)
-  * [实时流处理](concepts/timely-stream-processing.html)
-  * [Flink 架构](concepts/flink-architecture.html)
-  * [术语表](concepts/glossary.html)
+* [操作培训]({% link learn-flink/index.zh.md %}) 包含了一系列的课程和练习,提供了对 Flink 的逐一介绍。
 
-## API 参考
+* [概念透析]({% link concepts/index.zh.md %}) 介绍了在浏览参考文档之前你需要了解的 Flink 知识。
 
-API 参考列举并解释了 Flink API 的所有功能。
+### 获取 Flink 帮助
 
-* [DataStream API](dev/datastream_api.html)
-* [DataSet API](dev/batch/index.html)
-* [Table API & SQL](dev/table/index.html)
+如果你遇到困难了, 可以在 
[社区](https://flink.apache.org/zh/community.html)寻求帮助。值得一提的是,Apache Flink 
的用户邮件列表一直是 Apache 项目里面最活跃的之一,也是一个快速获得帮助的好途径。
 
-## 部署
+
+
 
-在线上环境运行你的 Flink 作业之前,请阅读 [生产环境注意事项检查清单](ops/production_ready.html)。
+### 探索 Flink
 
-## 发布日志
+参考文档包含了 Flink 所有内容。 你可以从以下几点开始学习:
 
-发布日志包含了 Flink 版本之间的重大更新。请在你升级 Flink 之前仔细阅读相应的发布日志。
+
+
 
-* [Flink 1.10 的发布日志](release-notes/flink-1.10.html).
-* [Flink 1.9 的发布日志](release-notes/flink-1.9.html)。
-* [Flink 1.8 的发布日志](release-notes/flink-1.8.html)。
-* [Flink 1.7 的发布日志](release-notes/flink-1.7.html)。
-* [Flink 1.6 的发布日志](release-notes/flink-1.6.html)。
-* [Flink 1.5 的发布日志](release-notes/flink-1.5.html)。
+* [DataStream API]({% link dev/datastream_api.zh.md %})
+* [Table API & SQL]({% link dev/table/index.zh.md %})
+* [状态方法]({% if site.is_stable %} {{ site.statefundocs_stable_baseurl }} {% 
else %} {{ site.statefundocs_baseurl }} {% endif %})
 
-## 外部资源
+
+
 
-- **Flink Forward**: 已举办的所有大会演讲均可在 [Flink Forward](http://flink-forward.org/) 
官网以及 [YouTube](https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA)找到。[使用 
Apache Flink 
进行高可靠的流处理](http://2016.flink-forward.org/kb_sessions/robust-stream-processing-with-apache-flink/)
 可以作为你第一个学习的资源。
+* [配置参数]({% link ops/config.zh.md %})
+* [Rest API]({% link monitoring/rest_api.zh.md %})
+* [CLI]({% link ops/cli.zh.md %})
 
-- **培训**: [培训资料](https://training.ververica.com/) 包含讲义,练习以及示例程序。
+
+
 
-- **博客**: [Apache Flink](https://flink.apache.org/blog/) 以及 
[Ververica](https://www.ververica.com/blog) 的博客会经常更新一些有关 Flink 的技术文章。
+### 部署 Flink
+
+在线上环境运行你的 Flink 作业之前,请阅读 [生产环境注意事项检查清单]({% link ops/production_ready.zh.md 
%}). 各种部署环境一览,详见 [集群与部署]({% link ops/deployment/index.zh.md %}). 
+
+### 升级 Flink
+
+release notes 包含了 Flink 版本之间的重大更新。请在你升级 Flink 之前仔细阅读相应的 release notes。
+
+请阅读 release notes [Flink 1.10]({% link release-notes/flink-1.10.zh.md %}), 
[Flink 1.9]({% link release-notes/flink-1.9.zh.md %}), [Flink 1.8]({% link 
release-notes/flink-1.8.zh.md %}), or [Flink 1.7]({% link 
release-notes/flink-1.7.zh.md %}).
+
+
+
+
+
+
+本文档适用于 Apache Flink {{ site.version_title }} 版本。本页面最近更新于: {% build_time %}.
+
+



[flink] branch master updated (fea20ad -> f4aaf8c)

2020-06-16 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fea20ad  [FLINK-18268][docs] Correct Table API in Temporal table docs
 add f4aaf8c  [FLINK-18282][docs-zh] Retranslate the home page document

No new revisions were added by this update.

Summary of changes:
 docs/index.zh.md | 82 ++--
 1 file changed, 50 insertions(+), 32 deletions(-)



[flink] branch master updated (fea20ad -> f4aaf8c)

2020-06-16 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fea20ad  [FLINK-18268][docs] Correct Table API in Temporal table docs
 add f4aaf8c  [FLINK-18282][docs-zh] Retranslate the home page document

No new revisions were added by this update.

Summary of changes:
 docs/index.zh.md | 82 ++--
 1 file changed, 50 insertions(+), 32 deletions(-)



[flink] branch master updated: Fix typo in KafkaResource (#12564)

2020-06-09 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4db3680  Fix typo in KafkaResource (#12564)
4db3680 is described below

commit 4db368035f00ef7b990a86f1285921e9834e4f6c
Author: Da(Dash)Shen 
AuthorDate: Wed Jun 10 11:23:52 2020 +0800

Fix typo in KafkaResource (#12564)

This closes #12564
---
 .../src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 0157ad2..bd5ba46 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -102,7 +102,7 @@ public interface KafkaResource extends ExternalResource {
/**
 * Returns the configured KafkaResource implementation, or a {@link 
LocalStandaloneKafkaResource} if none is configured.
 *
-* @return configured KafkaResource, or {@link 
LocalStandaloneKafkaResource} is none is configured
+* @return configured KafkaResource, or {@link 
LocalStandaloneKafkaResource} if none is configured
 */
static KafkaResource get(final String version) {
return FactoryUtils.loadAndInvokeFactory(