(flink) branch master updated: [FLINK-16627][json] Support ignore null fields when serializing into JSON
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 93555a7d55d [FLINK-16627][json] Support ignore null fields when serializing into JSON 93555a7d55d is described below commit 93555a7d55dc27571122cccb7c4d8af2c5db54cb Author: zhouyisha AuthorDate: Fri Mar 1 17:11:32 2024 +0800 [FLINK-16627][json] Support ignore null fields when serializing into JSON Close apache/flink#24430 --- .../docs/connectors/table/formats/debezium.md | 7 ++ .../docs/connectors/table/formats/json.md | 7 ++ .../docs/connectors/table/formats/maxwell.md | 7 ++ .../docs/connectors/table/formats/ogg.md | 9 ++- .../docs/connectors/table/formats/debezium.md | 7 ++ docs/content/docs/connectors/table/formats/json.md | 8 +++ .../docs/connectors/table/formats/maxwell.md | 7 ++ docs/content/docs/connectors/table/formats/ogg.md | 7 ++ .../hive/util/ThriftObjectConversions.java | 2 +- .../flink/formats/json/JsonFormatFactory.java | 7 +- .../flink/formats/json/JsonFormatOptions.java | 7 ++ .../json/JsonRowDataSerializationSchema.java | 21 -- .../formats/json/RowDataToJsonConverters.java | 15 ++-- .../formats/json/canal/CanalJsonFormatFactory.java | 7 +- .../json/canal/CanalJsonSerializationSchema.java | 6 +- .../json/debezium/DebeziumJsonFormatFactory.java | 6 +- .../debezium/DebeziumJsonSerializationSchema.java | 6 +- .../json/maxwell/MaxwellJsonFormatFactory.java | 7 +- .../maxwell/MaxwellJsonSerializationSchema.java| 6 +- .../formats/json/ogg/OggJsonFormatFactory.java | 7 +- .../json/ogg/OggJsonSerializationSchema.java | 6 +- .../flink/formats/json/JsonFormatFactoryTest.java | 2 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 84 +++--- .../json/canal/CanalJsonFormatFactoryTest.java | 3 + .../json/canal/CanalJsonSerDeSchemaTest.java | 3 +- .../debezium/DebeziumJsonFormatFactoryTest.java| 2 + .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 3 +- .../json/maxwell/MaxwellJsonFormatFactoryTest.java | 2 + .../json/maxwell/MaxwellJsonSerDerTest.java| 3 +- .../formats/json/ogg/OggJsonFormatFactoryTest.java | 2 + .../formats/json/ogg/OggJsonSerDeSchemaTest.java | 3 +- .../gateway/rest/serde/ResultInfoSerializer.java | 5 +- 32 files changed, 236 insertions(+), 38 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md b/docs/content.zh/docs/connectors/table/formats/debezium.md index a6ac486f0f0..7ba62dd408d 100644 --- a/docs/content.zh/docs/connectors/table/formats/debezium.md +++ b/docs/content.zh/docs/connectors/table/formats/debezium.md @@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来 Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.00027。 + + debezium-json.encode.ignore-null-fields + 选填 + false + Boolean + 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 + diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index f1acdd7a001..005485a7a0a 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -135,6 +135,13 @@ Format 参数 Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.00027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.00027。 + + json.encode.ignore-null-fields + 选填 + false + Boolean + 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 + decode.json-parser.enabled 选填 diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md b/docs/content.zh/docs/connectors/table/formats/maxwell.md index a3ac161f231..0bdedeac682 100644 --- a/docs/content.zh/docs/connectors/table/formats/maxwell.md +++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md @@ -251,6 +251,13 @@ Format Options Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.00027 is encoded as 2.7E-8 by default, and will be written as 0.00027 if set this option to true. + + maxwell-json.encode.ignore-null-fields + optional + false + Boolean + Encode only non-null fields. By default, all fields will be included. + diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md b/docs/content.zh/docs/connectors/table/formats/ogg.md index c8e8a7a6c6d..61ec97b60fd 100644 --- a/docs/content.zh/docs/connectors/table/formats/ogg.md +++ b/docs/content.zh/docs/connectors/table/formats/ogg.md @@ -216,7 +216,7 @@ Format
(flink) branch master updated: [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic.
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e0b6c121eaf [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic. e0b6c121eaf is described below commit e0b6c121eaf7aeb2974a45d199e452b022f07d29 Author: SuDewei AuthorDate: Mon Mar 4 17:42:38 2024 +0800 [FLINK-33263][bugfix][table-planner] Remove redundant transformation verification logic. --- .../org/apache/flink/api/dag/Transformation.java | 15 .../table/planner/delegation/BatchPlanner.scala| 2 +- .../table/planner/delegation/PlannerBase.scala | 3 +- .../table/planner/delegation/StreamPlanner.scala | 2 +- .../planner/plan/stream/sql/TableScanTest.xml | 95 -- .../planner/plan/stream/sql/TableScanTest.scala| 12 ++- .../flink/table/planner/utils/TableTestBase.scala | 51 +--- 7 files changed, 81 insertions(+), 99 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index 6256f9624f6..a0448697dd1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -19,7 +19,6 @@ package org.apache.flink.api.dag; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; @@ -603,20 +602,6 @@ public abstract class Transformation { + '}'; } -@VisibleForTesting -public String toStringWithoutId() { -return getClass().getSimpleName() -+ "{" -+ "name='" -+ name -+ '\'' -+ ", outputType=" -+ outputType -+ ", parallelism=" -+ parallelism -+ '}'; -} - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index cea10f7bb81..bb4c1b75a28 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -84,7 +84,7 @@ class BatchPlanner( processors } - override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { + override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { beforeTranslation() val planner = createDummyPlanner() diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index b36edaa21d7..45788e6278e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -367,8 +367,7 @@ abstract class PlannerBase( * @return * The [[Transformation]] DAG that corresponds to the node DAG. */ - @VisibleForTesting - def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] + protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] def addExtraTransformation(transformation: Transformation[_]): Unit = { if (!extraTransformations.contains(transformation)) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 894a37c8cf9..fb32326f117 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -78,7 +78,7 @@ class StreamPlanner( override protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq() - override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = { + override protected def translateToPlan(execGraph: ExecNodeGra
(flink) branch master updated: [FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive types on Proto3
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bf75870c474 [FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive types on Proto3 bf75870c474 is described below commit bf75870c4744c884860ee72bf464a301d18fb477 Author: dsaisharath AuthorDate: Fri Jan 5 16:03:52 2024 -0800 [FLINK-33817][protobuf] Allow ReadDefaultValues = False for non primitive types on Proto3 Close apache/flink#24035 --- .../docs/connectors/table/formats/protobuf.md | 7 +-- .../flink/formats/protobuf/PbFormatContext.java | 9 - .../flink/formats/protobuf/PbFormatOptions.java | 5 - .../deserialize/PbCodegenRowDeserializer.java | 14 ++ .../protobuf/deserialize/ProtoToRowConverter.java | 15 ++- .../protobuf/serialize/RowToProtoConverter.java | 4 ++-- .../apache/flink/formats/protobuf/Pb3ToRowTest.java | 21 +++-- 7 files changed, 42 insertions(+), 33 deletions(-) diff --git a/docs/content/docs/connectors/table/formats/protobuf.md b/docs/content/docs/connectors/table/formats/protobuf.md index b28cf49f9d3..72d4aae3c3d 100644 --- a/docs/content/docs/connectors/table/formats/protobuf.md +++ b/docs/content/docs/connectors/table/formats/protobuf.md @@ -149,9 +149,12 @@ Format Options false Boolean - This option only works if the generated class's version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file. + If this value is set to true, the format will read empty values as the default values defined in the proto file. If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message. - If the proto syntax is proto3, this value will forcibly be set to true, because proto3's standard is to use default values. + If proto syntax is proto3, users need to set this to true when using protobuf versions lower than 3.15 as older versions do not support + checking for field presence which can cause runtime compilation issues. Additionally, primtive types will be set to default values + instead of null as field presence cannot be checked for them. Please be aware that setting this to true will cause the deserialization + performance to be much slower depending on schema complexity and message size. diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java index 9302cc274c3..fc2090e24de 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java @@ -28,9 +28,12 @@ import java.util.List; public class PbFormatContext { private final PbFormatConfig pbFormatConfig; private final List splitMethodStack = new ArrayList<>(); +private final boolean readDefaultValuesForPrimitiveTypes; -public PbFormatContext(PbFormatConfig pbFormatConfig) { +public PbFormatContext( +PbFormatConfig pbFormatConfig, boolean readDefaultValuesForPrimitiveTypes) { this.pbFormatConfig = pbFormatConfig; +this.readDefaultValuesForPrimitiveTypes = readDefaultValuesForPrimitiveTypes; } private String createSplitMethod( @@ -73,4 +76,8 @@ public class PbFormatContext { public PbFormatConfig getPbFormatConfig() { return pbFormatConfig; } + +public boolean getReadDefaultValuesForPrimitiveTypes() { +return readDefaultValuesForPrimitiveTypes; +} } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java index 1cf884f8314..18b8fc9618f 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatOptions.java @@ -44,7 +44,10 @@ public class PbFormatOptions { .defaultValue(false) .withDescription( "Optional flag to read as default values instead of null when some field does not exist in deserialization; default to false." -+ "If proto syntax is proto3, this value will be set true forcibly because proto3's standard is to use default values."); +
(flink) branch master updated: [FLINK-33611][flink-protobuf] Split last segment only when size exceeds split threshold limit in deserializer
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new df03ada10e2 [FLINK-33611][flink-protobuf] Split last segment only when size exceeds split threshold limit in deserializer df03ada10e2 is described below commit df03ada10e226053780cb2e5e9742add4536289c Author: dsaisharath AuthorDate: Wed Dec 13 14:14:38 2023 -0800 [FLINK-33611][flink-protobuf] Split last segment only when size exceeds split threshold limit in deserializer Close apache/flink#23937 --- .../deserialize/PbCodegenRowDeserializer.java | 5 +- .../protobuf/serialize/PbCodegenRowSerializer.java | 8 +- .../formats/protobuf/VeryBigPbProtoToRowTest.java | 39 + .../formats/protobuf/VeryBigPbRowToProtoTest.java | 39 + .../src/test/proto/test_very_big_pb.proto | 98 ++ 5 files changed, 178 insertions(+), 11 deletions(-) diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java index f3cf414f540..541b1f83839 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java @@ -109,10 +109,7 @@ public class PbCodegenRowDeserializer implements PbCodegenDeserializer { index += 1; } if (!splitAppender.code().isEmpty()) { -String splitMethod = -formatContext.splitDeserializerRowTypeMethod( -flinkRowDataVar, pbMessageTypeStr, pbMessageVar, splitAppender.code()); -appender.appendSegment(splitMethod); +appender.appendSegment(splitAppender.code()); } appender.appendLine(resultVar + " = " + flinkRowDataVar); return appender.code(); diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java index 342117cd272..6f72ab83b81 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java @@ -130,13 +130,7 @@ public class PbCodegenRowSerializer implements PbCodegenSerializer { index += 1; } if (!splitAppender.code().isEmpty()) { -String splitMethod = -formatContext.splitSerializerRowTypeMethod( -flinkRowDataVar, -pbMessageTypeStr + ".Builder", -messageBuilderVar, -splitAppender.code()); -appender.appendSegment(splitMethod); +appender.appendSegment(splitAppender.code()); } appender.appendLine(resultVar + " = " + messageBuilderVar + ".build()"); return appender.code(); diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java new file mode 100644 index 000..a3e09ff34c8 --- /dev/null +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass; + +import org.junit.Test; + +/** + * Test for very huge proto definition, which may trigger some special optimiza
(flink) branch master updated (8e11b6005c9 -> 98e1d079f6e)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 8e11b6005c9 [FLINK-34272][runtime] ExecutingState should never transition during its construction. add 3c8088b716a [hotfix][table-common] Mark SinkV2Provider as the recommended core interface for SinkRuntimeProvider add 98e1d079f6e [FLINK-34270][docs] Update doc for the new Table source & sink interfaces introduced in FLIP-146/367 No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/table/sourcesSinks.md | 12 ++-- docs/content/docs/dev/table/sourcesSinks.md| 22 ++ .../table/connector/sink/DynamicTableSink.java | 16 3 files changed, 36 insertions(+), 14 deletions(-)
(flink) branch master updated: [FLINK-33264][table] Support source parallelism setting for DataGen connector
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 862a7129d27 [FLINK-33264][table] Support source parallelism setting for DataGen connector 862a7129d27 is described below commit 862a7129d2730b4c70a21826a5b858fc541a4470 Author: Zhanghao Chen AuthorDate: Thu Jan 18 21:26:42 2024 +0800 [FLINK-33264][table] Support source parallelism setting for DataGen connector Close apache/flink#24133 --- docs/content.zh/docs/connectors/table/datagen.md | 7 docs/content/docs/connectors/table/datagen.md | 7 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 4 +- .../src/test/resources/sql/table.q | 1 + .../src/test/resources/sql/table.q | 1 + .../datagen/table/DataGenConnectorOptions.java | 3 ++ .../datagen/table/DataGenTableSource.java | 11 -- .../datagen/table/DataGenTableSourceFactory.java | 5 ++- .../factories/DataGenTableSourceFactoryTest.java | 46 ++ .../stream/table/DataGeneratorConnectorITCase.java | 25 10 files changed, 104 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md index 642382a7768..b90183fcc8c 100644 --- a/docs/content.zh/docs/connectors/table/datagen.md +++ b/docs/content.zh/docs/connectors/table/datagen.md @@ -266,6 +266,13 @@ CREATE TABLE Orders ( Long 生成数据的总行数。默认情况下,该表是无界的。 + + scan.parallelism + 可选 + (none) + Integer + 定义算子并行度。不设置将使用全局默认并发。 + fields.#.kind 可选 diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md index 70253786bff..1f105076897 100644 --- a/docs/content/docs/connectors/table/datagen.md +++ b/docs/content/docs/connectors/table/datagen.md @@ -270,6 +270,13 @@ Connector Options Long The total number of rows to emit. By default, the table is unbounded. + + scan.parallelism + optional + (none) + Integer + Defines the parallelism of the source. If not set, the global default parallelism is used. + fields.#.kind optional diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 4e44bbf3505..0fd86b29430 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -76,8 +76,8 @@ Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method in (GeneratorSourceReaderFactory.java:54) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method in (GeneratorSourceReaderFactory.java:55) Constructor (org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has parameter of type in (GeneratorSourceReaderFactory.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long)> depends on component type in (DataGenTableSource.java:0) -Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) +Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, java.lang.Integer)> depends on component type in (DataGenTableSource.java:0) +Constructor ([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, java.lang.Integer)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) Constructor (java.lang.String, org.apache.flink.configuration.ReadableConfig)> calls constructor ()> in (DataGenVisitorBase.java:49) Constructor (org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, jav
(flink) branch master updated: [FLINK-33263][table-planner] Implement ParallelismProvider for sources in the table planner
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 42b7e74ab20 [FLINK-33263][table-planner] Implement ParallelismProvider for sources in the table planner 42b7e74ab20 is described below commit 42b7e74ab20785289b62f5dd68d566995ba9dcfc Author: SuDewei AuthorDate: Thu Jan 18 16:05:40 2024 +0800 [FLINK-33263][table-planner] Implement ParallelismProvider for sources in the table planner Close apache/flink#24128 --- .../org/apache/flink/api/dag/Transformation.java | 15 ++ .../streaming/api/graph/StreamGraphGenerator.java | 3 + .../SourceTransformationWrapper.java | 72 ++ .../exec/common/CommonExecTableSourceScan.java | 154 ++--- .../table/planner/delegation/BatchPlanner.scala| 2 +- .../table/planner/delegation/PlannerBase.scala | 3 +- .../table/planner/delegation/StreamPlanner.scala | 2 +- .../planner/factories/TestValuesTableFactory.java | 33 +++-- .../planner/plan/stream/sql/TableScanTest.xml | 42 ++ .../planner/plan/stream/sql/TableScanTest.scala| 38 + .../runtime/stream/sql/TableSourceITCase.scala | 80 +++ .../flink/table/planner/utils/TableTestBase.scala | 51 ++- 12 files changed, 463 insertions(+), 32 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index a0448697dd1..6256f9624f6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -19,6 +19,7 @@ package org.apache.flink.api.dag; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; @@ -602,6 +603,20 @@ public abstract class Transformation { + '}'; } +@VisibleForTesting +public String toStringWithoutId() { +return getClass().getSimpleName() ++ "{" ++ "name='" ++ name ++ '\'' ++ ", outputType=" ++ outputType ++ ", parallelism=" ++ parallelism ++ '}'; +} + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 5929a2a5e8e..8e267ff84d6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -65,6 +65,7 @@ import org.apache.flink.streaming.api.transformations.ReduceTransformation; import org.apache.flink.streaming.api.transformations.SideOutputTransformation; import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformationWrapper; import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; @@ -553,6 +554,8 @@ public class StreamGraphGenerator { transformedIds = transformFeedback((FeedbackTransformation) transform); } else if (transform instanceof CoFeedbackTransformation) { transformedIds = transformCoFeedback((CoFeedbackTransformation) transform); +} else if (transform instanceof SourceTransformationWrapper) { +transformedIds = transform(((SourceTransformationWrapper) transform).getInput()); } else { throw new IllegalStateException("Unknown transformation: " + transform); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java new file mode 100644 index 000..d536000fde2 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformationWrapper.java @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Softwar
(flink) branch master updated: [hotfix][docs] Add protobuf format info to overview page
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2003b70aceb [hotfix][docs] Add protobuf format info to overview page 2003b70aceb is described below commit 2003b70aceb2cdddfbd3d05263e1dbd699bf6585 Author: sharath1709 AuthorDate: Tue Jan 9 15:14:16 2024 -0800 [hotfix][docs] Add protobuf format info to overview page --- docs/content/docs/connectors/table/formats/overview.md | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/content/docs/connectors/table/formats/overview.md b/docs/content/docs/connectors/table/formats/overview.md index 987ba8d8820..4ae1c6987ba 100644 --- a/docs/content/docs/connectors/table/formats/overview.md +++ b/docs/content/docs/connectors/table/formats/overview.md @@ -26,7 +26,7 @@ under the License. # Formats -Flink provides a set of table formats that can be used with table connectors. A table format is a storage format defines how to map binary data onto table columns. +Flink provides a set of table formats that can be used with table connectors. A table format is a storage format that defines how to map binary data onto table columns. Flink supports the following formats: @@ -68,6 +68,10 @@ Flink supports the following formats: }}">Apache Kafka, }}">Upsert Kafka + + }}">Protobuf + }}">Apache Kafka + }}">Debezium CDC }}">Apache Kafka,
(flink) branch release-1.17 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 58f8162613d [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable 58f8162613d is described below commit 58f8162613d9f615e60fb0c9e23692d25469d6f0 Author: xuyang AuthorDate: Thu Nov 23 14:05:07 2023 +0800 [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable Close apache/flink#23779 --- .../functions/BuiltInFunctionDefinitions.java | 4 +- .../functions/sql/FlinkSqlOperatorTable.java | 8 +- .../table/planner/codegen/calls/RandCallGen.scala | 5 +- .../planner/functions/BuiltInFunctionTestBase.java | 89 ++-- .../planner/functions/RandFunctionITCase.java | 94 ++ .../planner/expressions/ScalarFunctionsTest.scala | 45 +++ .../expressions/utils/ExpressionTestBase.scala | 14 +++- 7 files changed, 226 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 5bbdca9a054..b96422625a8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1397,7 +1397,7 @@ public final class BuiltInFunctionDefinitions { .kind(SCALAR) .notDeterministic() .inputTypeStrategy(or(NO_ARGS, sequence(logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull())) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE( .build(); public static final BuiltInFunctionDefinition RAND_INTEGER = @@ -1411,7 +1411,7 @@ public final class BuiltInFunctionDefinitions { sequence( logical(LogicalTypeRoot.INTEGER), logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(INT().notNull())) +.outputTypeStrategy(nullableIfArgs(explicit(INT( .build(); public static final BuiltInFunctionDefinition BIN = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index cb2ff52c6e8..04cf1540bc8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -922,7 +922,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND", SqlKind.OTHER_FUNCTION, -ReturnTypes.DOUBLE, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.DOUBLE), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { @@ -940,7 +942,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND_INTEGER", SqlKind.OTHER_FUNCTION, -ReturnTypes.INTEGER, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.INTEGER), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala index 1a7b4950586..e322e1854ed 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala @@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) extends CallGener
(flink) branch release-1.18 updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new d6081815000 [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable d6081815000 is described below commit d60818150005661006a71e4155fc605d7543362b Author: xuyang AuthorDate: Thu Nov 23 14:05:07 2023 +0800 [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable Close apache/flink#23779 --- .../functions/BuiltInFunctionDefinitions.java | 4 +- .../functions/sql/FlinkSqlOperatorTable.java | 8 +- .../table/planner/codegen/calls/RandCallGen.scala | 5 +- .../planner/functions/BuiltInFunctionTestBase.java | 89 ++-- .../planner/functions/RandFunctionITCase.java | 94 ++ .../planner/expressions/ScalarFunctionsTest.scala | 45 +++ .../expressions/utils/ExpressionTestBase.scala | 14 +++- 7 files changed, 226 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e653d1d6463..82197822dc5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1576,7 +1576,7 @@ public final class BuiltInFunctionDefinitions { .kind(SCALAR) .notDeterministic() .inputTypeStrategy(or(NO_ARGS, sequence(logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull())) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE( .build(); public static final BuiltInFunctionDefinition RAND_INTEGER = @@ -1590,7 +1590,7 @@ public final class BuiltInFunctionDefinitions { sequence( logical(LogicalTypeRoot.INTEGER), logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(INT().notNull())) +.outputTypeStrategy(nullableIfArgs(explicit(INT( .build(); public static final BuiltInFunctionDefinition BIN = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 9769613cd2d..1a98081dd79 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND", SqlKind.OTHER_FUNCTION, -ReturnTypes.DOUBLE, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.DOUBLE), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { @@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND_INTEGER", SqlKind.OTHER_FUNCTION, -ReturnTypes.INTEGER, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.INTEGER), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala index 1a7b4950586..e322e1854ed 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala @@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) extends CallGener
(flink) branch master updated: [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 45f966e8c3c [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable 45f966e8c3c is described below commit 45f966e8c3c5e903b3843391874f7d2478122d8c Author: xuyang AuthorDate: Thu Nov 23 14:05:07 2023 +0800 [FLINK-33541][table-planner] RAND and RAND_INTEGER should return type nullable if the arguments are nullable Close apache/flink#23779 --- .../functions/BuiltInFunctionDefinitions.java | 4 +- .../functions/sql/FlinkSqlOperatorTable.java | 8 +- .../table/planner/codegen/calls/RandCallGen.scala | 5 +- .../planner/functions/BuiltInFunctionTestBase.java | 89 ++-- .../planner/functions/RandFunctionITCase.java | 94 ++ .../planner/expressions/ScalarFunctionsTest.scala | 45 +++ .../expressions/utils/ExpressionTestBase.scala | 14 +++- 7 files changed, 226 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 89ac66d18f6..b65afdc4284 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1644,7 +1644,7 @@ public final class BuiltInFunctionDefinitions { .kind(SCALAR) .notDeterministic() .inputTypeStrategy(or(NO_ARGS, sequence(logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(DataTypes.DOUBLE().notNull())) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.DOUBLE( .build(); public static final BuiltInFunctionDefinition RAND_INTEGER = @@ -1658,7 +1658,7 @@ public final class BuiltInFunctionDefinitions { sequence( logical(LogicalTypeRoot.INTEGER), logical(LogicalTypeRoot.INTEGER -.outputTypeStrategy(explicit(INT().notNull())) +.outputTypeStrategy(nullableIfArgs(explicit(INT( .build(); public static final BuiltInFunctionDefinition BIN = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 9769613cd2d..1a98081dd79 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -940,7 +940,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND", SqlKind.OTHER_FUNCTION, -ReturnTypes.DOUBLE, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.DOUBLE), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { @@ -958,7 +960,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { new SqlFunction( "RAND_INTEGER", SqlKind.OTHER_FUNCTION, -ReturnTypes.INTEGER, +ReturnTypes.cascade( +ReturnTypes.explicit(SqlTypeName.INTEGER), +SqlTypeTransforms.TO_NULLABLE), null, OperandTypes.or( new SqlSingleOperandTypeChecker[] { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala index 1a7b4950586..e322e1854ed 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/RandCallGen.scala @@ -39,11 +39,12 @@ class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) extends CallGener
(flink) branch release-1.17 updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new cfc7881ab5f [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal cfc7881ab5f is described below commit cfc7881ab5f1c27d7a9a7781b255fa94989201b9 Author: zoudan AuthorDate: Thu Oct 19 17:48:05 2023 +0800 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal Close apache/flink#23551 --- .../table/planner/plan/utils/RexNodeExtractor.scala | 4 .../plan/utils/NestedProjectionUtilTest.scala | 10 ++ .../planner/plan/utils/RexNodeExtractorTest.scala | 21 + .../planner/plan/utils/RexNodeRewriterTest.scala| 7 --- .../table/planner/plan/utils/RexNodeTestBase.scala | 6 -- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index a7cbb4a9ffc..4b57796e792 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical.YearMonthIntervalType import org.apache.flink.util.Preconditions +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rex._ import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator} @@ -489,6 +490,9 @@ class RexNodeToExpressionConverter( // convert to BigDecimal literal.getValueAs(classOf[java.math.BigDecimal]) + case BINARY | VARBINARY => +literal.getValueAs(classOf[Array[Byte]]) + case _ => literal.getValue } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala index ec8214f5b91..9cd44c9fea7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala @@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase { "$2", "$3", "$4", + "$5", "*($t2, $t3)", "100", - "<($t5, $t6)", + "<($t6, $t7)", "6", - ">($t1, $t8)", - "AND($t7, $t9)"))) + ">($t1, $t9)", + "AND($t8, $t10)"))) val nestedField = NestedProjectionUtil.build(exprs, rexProgram.getInputRowType) val paths = NestedProjectionUtil.convertToIndexArray(nestedField) @@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase { Array(1), Array(2), Array(3), - Array(4) + Array(4), + Array(5) ) assertArray(paths, orderedPaths) val builder = new FlinkRexBuilder(typeFactory) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 2eb87e35cc8..bd5f15d3bed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction import org.apache.flink.table.resource.ResourceManager import org.apache.flink.table.utils.CatalogManagerMocks +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName @@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase { assertEquals(0, unconvertedRexNodes.length) } + @Test + def testExtractConditionWithBinaryLiteral(): Unit = { +// blob +val t0 = rexBuilder.makeInputRef(allF
(flink) branch release-1.18 updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 265d8174165 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal 265d8174165 is described below commit 265d81741654ff70836485e8e8d31ce33a87e960 Author: zoudan AuthorDate: Thu Oct 19 17:48:05 2023 +0800 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal Close apache/flink#23551 --- .../table/planner/plan/utils/RexNodeExtractor.scala | 4 .../plan/utils/NestedProjectionUtilTest.scala | 10 ++ .../planner/plan/utils/RexNodeExtractorTest.scala | 21 + .../planner/plan/utils/RexNodeRewriterTest.scala| 7 --- .../table/planner/plan/utils/RexNodeTestBase.scala | 6 -- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index a7cbb4a9ffc..4b57796e792 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical.YearMonthIntervalType import org.apache.flink.util.Preconditions +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rex._ import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator} @@ -489,6 +490,9 @@ class RexNodeToExpressionConverter( // convert to BigDecimal literal.getValueAs(classOf[java.math.BigDecimal]) + case BINARY | VARBINARY => +literal.getValueAs(classOf[Array[Byte]]) + case _ => literal.getValue } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala index ec8214f5b91..9cd44c9fea7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala @@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase { "$2", "$3", "$4", + "$5", "*($t2, $t3)", "100", - "<($t5, $t6)", + "<($t6, $t7)", "6", - ">($t1, $t8)", - "AND($t7, $t9)"))) + ">($t1, $t9)", + "AND($t8, $t10)"))) val nestedField = NestedProjectionUtil.build(exprs, rexProgram.getInputRowType) val paths = NestedProjectionUtil.convertToIndexArray(nestedField) @@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase { Array(1), Array(2), Array(3), - Array(4) + Array(4), + Array(5) ) assertArray(paths, orderedPaths) val builder = new FlinkRexBuilder(typeFactory) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 2eb87e35cc8..bd5f15d3bed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction import org.apache.flink.table.resource.ResourceManager import org.apache.flink.table.utils.CatalogManagerMocks +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName @@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase { assertEquals(0, unconvertedRexNodes.length) } + @Test + def testExtractConditionWithBinaryLiteral(): Unit = { +// blob +val t0 = rexBuilder.makeInputRef(allF
(flink) branch master updated: [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b6380d54f11 [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway b6380d54f11 is described below commit b6380d54f11ebb062397d775764f28f20fd35b69 Author: zoudan AuthorDate: Fri Dec 1 10:41:31 2023 +0800 [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway Close apache/flink#23849 --- docs/content.zh/docs/dev/table/olap_quickstart.md | 9 +- .../docs/dev/table/sql-gateway/overview.md | 18 ++ docs/content/docs/dev/table/olap_quickstart.md | 11 +- .../content/docs/dev/table/sql-gateway/overview.md | 18 ++ .../operators/collect/CollectResultIterator.java | 20 ++ .../api/config/SqlGatewayServiceConfigOptions.java | 24 ++ .../gateway/service/context/SessionContext.java| 36 +++ .../service/operation/OperationExecutor.java | 49 +++- .../table/gateway/service/session/Session.java | 8 + .../service/SqlGatewayServiceStatementITCase.java | 71 +- .../src/test/resources/sql/repeated_dql.q | 261 + .../flink/table/api/internal/CachedPlan.java | 29 +++ .../flink/table/api/internal/DQLCachedPlan.java| 56 + .../table/api/internal/InsertResultProvider.java | 6 + .../flink/table/api/internal/PlanCacheManager.java | 65 + .../flink/table/api/internal/ResultProvider.java | 3 + .../table/api/internal/TableEnvironmentImpl.java | 29 ++- .../api/internal/TableEnvironmentInternal.java | 8 + .../flink/table/api/internal/TableResultImpl.java | 25 +- .../table/api/internal/TableResultInternal.java| 5 + .../planner/connectors/CollectDynamicSink.java | 7 + 21 files changed, 730 insertions(+), 28 deletions(-) diff --git a/docs/content.zh/docs/dev/table/olap_quickstart.md b/docs/content.zh/docs/dev/table/olap_quickstart.md index e109dfe71ee..32fae83b5bc 100644 --- a/docs/content.zh/docs/dev/table/olap_quickstart.md +++ b/docs/content.zh/docs/dev/table/olap_quickstart.md @@ -151,10 +151,11 @@ Session Cluster 和 SQL Gateway 都依赖连接器来获取表的元信息同时 SQL&Table 参数 -| 参数名称 | 默认值 | 推荐值 | -|:---|:--|:-| -| [table.optimizer.join-reorder-enabled]({{}}) | false | true | -| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true | +| 参数名称 | 默认值 | 推荐值 | +|:--|:--|:-| +| [table.optimizer.join-reorder-enabled]({{}}) | false | true | +| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true | +| [sql-gateway.session.plan-cache.enabled]({{}}) | false | true | Runtime 参数 diff --git a/docs/content.zh/docs/dev/table/sql-gateway/overview.md b/docs/content.zh/docs/dev/table/sql-gateway/overview.md index 81b1bea869f..4b91d6862ab 100644 --- a/docs/content.zh/docs/dev/table/sql-gateway/overview.md +++ b/docs/content.zh/docs/dev/table/sql-gateway/overview.md @@ -188,6 +188,24 @@ $ ./sql-gateway -Dkey=value Integer SQL Gateway 服务中存活 session 的最大数量。 + +sql-gateway.session.plan-cache.enabled +false +Boolean +设置为 true 的时候,SQL Gateway 会在一个 session 内部缓存并复用 plan。 + + +sql-gateway.session.plan-cache.size +100 +Integer +Plan cache 的大小, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。 + + +sql-gateway.session.plan-cache.ttl +1 hour +Duration +Plan cache 的 TTL, 控制 cache 在写入之后多久过期, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。 + sql-gateway.worker.keepalive-time 5 min diff --git a/docs/content/docs/dev/table/olap_quickstart.md b/docs/content/docs/dev/table/olap_quickstart.md index e5084e065c2..d630a80d313 100644 --- a/docs/content/docs/dev/table/olap_quickstart.md +++ b/docs/content/docs/dev/table/olap_quickstart.md @@ -151,10 +151,11 @@ In OLAP scenario, appropriate configurations that can greatly help users improve SQL&Table Options -| Parameters
(flink) branch master updated: [FLINK-27056][streaming] "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3856e41f517 [FLINK-27056][streaming] "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value 3856e41f517 is described below commit 3856e41f5172dee070a4b1d5f15829203b6d580f Author: Zhanghao Chen AuthorDate: Sat Jul 9 18:06:55 2022 +0800 [FLINK-27056][streaming] "pipeline.time-characteristic" should be deprecated and have EVENT_TIME as default value Close apache/flink#20231 --- docs/content.zh/docs/deployment/config.md | 1 - docs/content/docs/deployment/config.md | 1 - .../generated/stream_pipeline_configuration.html | 18 -- .../api/environment/StreamPipelineOptions.java | 17 - 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index bc2e406a220..ff8935923b6 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -314,7 +314,6 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" > ### Pipeline {{< generated/pipeline_configuration >}} -{{< generated/stream_pipeline_configuration >}} ### Checkpointing diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index 6071d674afe..3de3826c040 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -316,7 +316,6 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" > ### Pipeline {{< generated/pipeline_configuration >}} -{{< generated/stream_pipeline_configuration >}} ### Checkpointing diff --git a/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html b/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html deleted file mode 100644 index 8ddd9fb10c6..000 --- a/docs/layouts/shortcodes/generated/stream_pipeline_configuration.html +++ /dev/null @@ -1,18 +0,0 @@ - - - -Key -Default -Type -Description - - - - -pipeline.time-characteristic -ProcessingTime -Enum -The time characteristic for all created streams, e.g., processingtime, event time, or ingestion time.If you set the characteristic to IngestionTime or EventTime this will set a default watermark update interval of 200 ms. If this is not applicable for your application you should change it using pipeline.auto-watermark-interval.Possible values:"ProcessingTime""IngestionTime""EventTime" [...] - - - diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java index 01aa5f9172a..e848479f3b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPipelineOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.PipelineOptions; @@ -32,10 +33,24 @@ import org.apache.flink.streaming.api.TimeCharacteristic; */ @PublicEvolving public class StreamPipelineOptions { + +/** + * @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link + * TimeCharacteristic#EventTime}, thus you don't need to set this option for enabling + * event-time support anymore. Explicitly using processing-time windows and timers works in + * event-time mode. If you need to disable watermarks, please set {@link + * PipelineOptions#AUTO_WATERMARK_INTERVAL} to 0. If you are using {@link + * TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link + * WatermarkStrategy}. If you are using generic "time window" operations (for example {@link + * org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)} + * that change behaviour based on the time characteristic, please use equivalent operations + * that explicitly specify processing time or event time. + */ +
(flink) branch master updated: [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a2ec4c3b8bd [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems a2ec4c3b8bd is described below commit a2ec4c3b8bd9f2009fd721ab4c51afedcd8574fb Author: lijingwei.5018 AuthorDate: Mon Nov 13 11:32:43 2023 +0800 [FLINK-32650][protobuf] Split generated code in protobuf format to mitigate huge method problems Close apache/flink#23162 --- .../apache/flink/formats/protobuf/PbConstant.java | 6 + .../flink/formats/protobuf/PbFormatContext.java| 44 ++ .../deserialize/PbCodegenRowDeserializer.java | 30 +++- .../PbRowDataDeserializationSchema.java| 6 + .../protobuf/deserialize/ProtoToRowConverter.java | 13 ++ .../protobuf/serialize/PbCodegenRowSerializer.java | 37 - .../serialize/PbRowDataSerializationSchema.java| 6 + .../protobuf/serialize/RowToProtoConverter.java| 13 ++ .../formats/protobuf/util/PbCodegenUtils.java | 4 + .../formats/protobuf/BigPbProtoToRowTest.java | 140 + .../formats/protobuf/BigPbRowToProtoTest.java | 166 + .../src/test/proto/test_big_pb.proto | 61 12 files changed, 512 insertions(+), 14 deletions(-) diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java index ea7d6514c56..4a39794e68d 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbConstant.java @@ -27,4 +27,10 @@ public class PbConstant { public static final String PB_MAP_KEY_NAME = "key"; public static final String PB_MAP_VALUE_NAME = "value"; public static final String PB_OUTER_CLASS_SUFFIX = "OuterClass"; +/** + * JIT optimizer threshold is 8K, unicode encode one char use 1byte, so use 4K as + * codegen_spilt_threshold,A conservative threshold is selected to prevent multiple element code + * segments in RowType from being combined to exceed 8K. + */ +public static final int CODEGEN_SPLIT_THRESHOLD = 4000; } diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java index 27ceb0fb49d..9302cc274c3 100644 --- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatContext.java @@ -18,14 +18,58 @@ package org.apache.flink.formats.protobuf; +import org.apache.flink.formats.protobuf.util.PbCodegenAppender; +import org.apache.flink.formats.protobuf.util.PbCodegenVarId; + +import java.util.ArrayList; +import java.util.List; + /** store config and common information. */ public class PbFormatContext { private final PbFormatConfig pbFormatConfig; +private final List splitMethodStack = new ArrayList<>(); public PbFormatContext(PbFormatConfig pbFormatConfig) { this.pbFormatConfig = pbFormatConfig; } +private String createSplitMethod( +String rowDataType, +String rowDataVar, +String messageTypeStr, +String messageTypeVar, +String code) { +int uid = PbCodegenVarId.getInstance().getAndIncrement(); +String splitMethodName = "split" + uid; +PbCodegenAppender pbCodegenAppender = new PbCodegenAppender(); +pbCodegenAppender.appendSegment( +String.format( +"private static void %s (%s %s, %s %s) {\n %s \n}", +splitMethodName, +rowDataType, +rowDataVar, +messageTypeStr, +messageTypeVar, +code)); +splitMethodStack.add(pbCodegenAppender.code()); +return String.format("%s(%s, %s);", splitMethodName, rowDataVar, messageTypeVar); +} + +public String splitDeserializerRowTypeMethod( +String rowDataVar, String messageTypeStr, String messageTypeVar, String code) { +return createSplitMethod( +"GenericRowData", rowDataVar, messageTypeStr, messageTypeVar, code); +} + +public String splitSerializerRowTypeMethod( +String rowDataVar, String messageTypeStr, String messageTypeVar, String code) { +return createSplitMethod("RowDat
(flink) branch master updated: [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 55162dcc5cc [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface 55162dcc5cc is described below commit 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202 Author: Zhanghao Chen AuthorDate: Sat Nov 4 21:00:25 2023 +0800 [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface Close apache/flink#23663 --- .../connector/source/DataStreamScanProvider.java| 4 +++- .../connector/source/SourceFunctionProvider.java| 21 - .../flink/table/connector/ParallelismProvider.java | 12 ++-- .../table/connector/source/InputFormatProvider.java | 21 - .../table/connector/source/SourceProvider.java | 18 +- .../apache/flink/table/factories/FactoryUtil.java | 9 + .../connectors/TransformationScanProvider.java | 4 +++- 7 files changed, 78 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java index 7fc4687363f..213e3806327 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.data.RowData; @@ -35,7 +36,8 @@ import org.apache.flink.table.data.RowData; * or {@link InputFormatProvider}. */ @PublicEvolving -public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider { +public interface DataStreamScanProvider +extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** * Creates a scan Java {@link DataStream} from a {@link StreamExecutionEnvironment}. diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java index ff7238d8a2f..e5c35525e16 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java @@ -20,8 +20,13 @@ package org.apache.flink.table.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + +import java.util.Optional; + /** * Provider of a {@link SourceFunction} instance as a runtime implementation for {@link * ScanTableSource}. @@ -32,10 +37,19 @@ import org.apache.flink.table.data.RowData; */ @Deprecated @PublicEvolving -public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider { +public interface SourceFunctionProvider +extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** Helper method for creating a static provider. */ static SourceFunctionProvider of(SourceFunction sourceFunction, boolean isBounded) { +return of(sourceFunction, isBounded, null); +} + +/** Helper method for creating a Source provider with a provided source parallelism. */ +static SourceFunctionProvider of( +SourceFunction sourceFunction, +boolean isBounded, +@Nullable Integer sourceParallelism) { return new SourceFunctionProvider() { @Override public SourceFunction createSourceFunction() { @@ -46,6 +60,11 @@ public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvi public boolean isBounded() { return isBounded; } + +@Override +public Optional getParallelism() { +return Optional.ofNullable(sourceParallelism); +} }; } diff --git a/flink-table
[flink] branch master updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4d37b8c34ff [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal 4d37b8c34ff is described below commit 4d37b8c34ff062b7505ab8c0ca8f2181768aab60 Author: zoudan AuthorDate: Thu Oct 19 17:48:05 2023 +0800 [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal Close apache/flink#23551 --- .../table/planner/plan/utils/RexNodeExtractor.scala | 4 .../plan/utils/NestedProjectionUtilTest.scala | 10 ++ .../planner/plan/utils/RexNodeExtractorTest.scala | 21 + .../planner/plan/utils/RexNodeRewriterTest.scala| 7 --- .../table/planner/plan/utils/RexNodeTestBase.scala | 6 -- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala index 482ce56dc63..481cbda8b82 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala @@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.YearMonthIntervalType import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.util.Preconditions +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ @@ -502,6 +503,9 @@ class RexNodeToExpressionConverter( // convert to BigDecimal literal.getValueAs(classOf[java.math.BigDecimal]) + case BINARY | VARBINARY => +literal.getValueAs(classOf[Array[Byte]]) + case _ => literal.getValue } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala index ec8214f5b91..9cd44c9fea7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala @@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase { "$2", "$3", "$4", + "$5", "*($t2, $t3)", "100", - "<($t5, $t6)", + "<($t6, $t7)", "6", - ">($t1, $t8)", - "AND($t7, $t9)"))) + ">($t1, $t9)", + "AND($t8, $t10)"))) val nestedField = NestedProjectionUtil.build(exprs, rexProgram.getInputRowType) val paths = NestedProjectionUtil.convertToIndexArray(nestedField) @@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase { Array(1), Array(2), Array(3), - Array(4) + Array(4), + Array(5) ) assertArray(paths, orderedPaths) val builder = new FlinkRexBuilder(typeFactory) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala index 2eb87e35cc8..bd5f15d3bed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction import org.apache.flink.table.resource.ResourceManager import org.apache.flink.table.utils.CatalogManagerMocks +import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName @@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase { assertEquals(0, unconvertedRexNodes.length) } + @Test + def testExtractConditionWithBinaryLiteral(): Unit = { +// blob +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5) + +
[flink] branch release-1.17 updated: [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new c66bcb66fd7 [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format c66bcb66fd7 is described below commit c66bcb66fd7c201ced53e86a685bc3c6208a5c64 Author: Benchao Li AuthorDate: Fri Jul 7 22:10:45 2023 +0800 [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format --- docs/content/docs/connectors/table/formats/protobuf.md | 5 + 1 file changed, 5 insertions(+) diff --git a/docs/content/docs/connectors/table/formats/protobuf.md b/docs/content/docs/connectors/table/formats/protobuf.md index 5cbafc8d911..b28cf49f9d3 100644 --- a/docs/content/docs/connectors/table/formats/protobuf.md +++ b/docs/content/docs/connectors/table/formats/protobuf.md @@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type. enum The enum value of protobuf can be mapped to string or number of flink row accordingly. + + ROW<seconds BIGINT, nanos INT> + google.protobuf.timestamp + The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition. +
[flink] branch master updated: [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1aef1f13b99 [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format 1aef1f13b99 is described below commit 1aef1f13b9975f3603952d763d756e4f831a1e75 Author: Benchao Li AuthorDate: Fri Jul 7 22:10:45 2023 +0800 [FLINK-32547][docs] Add missing doc for Timestamp support in ProtoBuf format --- docs/content/docs/connectors/table/formats/protobuf.md | 5 + 1 file changed, 5 insertions(+) diff --git a/docs/content/docs/connectors/table/formats/protobuf.md b/docs/content/docs/connectors/table/formats/protobuf.md index 5cbafc8d911..b28cf49f9d3 100644 --- a/docs/content/docs/connectors/table/formats/protobuf.md +++ b/docs/content/docs/connectors/table/formats/protobuf.md @@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type. enum The enum value of protobuf can be mapped to string or number of flink row accordingly. + + ROW<seconds BIGINT, nanos INT> + google.protobuf.timestamp + The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition. +
[flink] branch master updated: [FLINK-32370][runtime] Change log level to debug for AbstractHandler when job has not been initialized or is finished
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4248d05da09 [FLINK-32370][runtime] Change log level to debug for AbstractHandler when job has not been initialized or is finished 4248d05da09 is described below commit 4248d05da092e3e580ac3238ea9af51151609e4f Author: Shammon FY AuthorDate: Mon Jun 26 08:35:05 2023 +0800 [FLINK-32370][runtime] Change log level to debug for AbstractHandler when job has not been initialized or is finished This mostly affects the e2e tests which will check the corresponding log file content Close apache/flink#22838 --- .../org/apache/flink/runtime/rest/handler/AbstractHandler.java | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java index b7d5c5bdca9..75bbdf495dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException; import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils; import org.apache.flink.runtime.rest.FileUploadHandler; import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator; @@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -266,7 +268,12 @@ public abstract class AbstractHandler< HttpResponseStatus.SERVICE_UNAVAILABLE, responseHeaders); } else { -log.error("Unhandled exception.", throwable); +if (throwable instanceof UnavailableDispatcherOperationException +|| throwable instanceof FileNotFoundException) { +log.debug("Job is not initialized or is finished: {}", throwable.getMessage()); +} else { +log.error("Unhandled exception.", throwable); +} String stackTrace = String.format( "",
[flink] branch master updated (c4b8cafe912 -> 4006de97352)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from c4b8cafe912 [FLINK-24592][Table SQL/Client] FlinkSQL Client multiline parser improvements add 4006de97352 [FLINK-31549][jdbc-driver] Add jdbc driver docs No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/table/jdbcDriver.md | 235 +++ docs/content.zh/docs/dev/table/overview.md | 1 + docs/content/docs/dev/table/jdbcDriver.md| 235 +++ docs/content/docs/dev/table/overview.md | 1 + 4 files changed, 472 insertions(+) create mode 100644 docs/content.zh/docs/dev/table/jdbcDriver.md create mode 100644 docs/content/docs/dev/table/jdbcDriver.md
[flink] branch master updated: [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8119411addd [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished 8119411addd is described below commit 8119411addd9c82c15bab8480e7b35b8e6394d43 Author: Shammon FY AuthorDate: Mon Jun 19 10:17:19 2023 +0800 [FLINK-32370][streaming] Fix warn log in result fetcher when job is finished Close apache/flink#22819 --- .../client/program/rest/RestClusterClientTest.java | 37 ++ .../operators/collect/CollectResultFetcher.java| 7 ++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 2ff6dea2a98..740eb06a57b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -122,6 +123,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -1136,6 +1138,32 @@ class RestClusterClientTest { } } +@Test +void testSendCoordinationRequestException() throws Exception { +final TestClientCoordinationHandler handler = +new TestClientCoordinationHandler(new FlinkJobNotFoundException(jobId)); +try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) { +try (RestClusterClient restClusterClient = + createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { +String payload = "testing payload"; +TestCoordinationRequest request = new TestCoordinationRequest<>(payload); + +assertThatThrownBy( +() -> +restClusterClient +.sendCoordinationRequest( +jobId, new OperatorID(), request) +.get()) +.matches( +e -> + ExceptionUtils.findThrowableWithMessage( +e, + FlinkJobNotFoundException.class.getName()) +.isPresent()); +} +} +} + /** * The SUSPENDED job status should never be returned by the client thus client retries until it * either receives a different job status or the cluster is not reachable. @@ -1166,9 +1194,15 @@ class RestClusterClientTest { ClientCoordinationRequestBody, ClientCoordinationResponseBody, ClientCoordinationMessageParameters> { +@Nullable private final FlinkJobNotFoundException exception; private TestClientCoordinationHandler() { +this(null); +} + +private TestClientCoordinationHandler(@Nullable FlinkJobNotFoundException exception) { super(ClientCoordinationHeaders.getInstance()); +this.exception = exception; } @Override @@ -1178,6 +1212,9 @@ class RestClusterClientTest { @Nonnull DispatcherGateway gateway) throws RestHandlerException { try { +if (exception != null) { +throw exception; +} TestCoordinationRequest req = (TestCoordinationRequest) request.getRequestBody() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java index 519b7d603f1..a7916502214 100644 --- a/flink-streami
[flink] branch master updated: [FLINK-32008][protobuf] Ensure bulk persistence is not supported
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7d4ee28e85a [FLINK-32008][protobuf] Ensure bulk persistence is not supported 7d4ee28e85a is described below commit 7d4ee28e85aad4abc8ad126c4d953d0e921ea07e Author: Ryan Skraba AuthorDate: Fri Jun 16 19:32:09 2023 +0200 [FLINK-32008][protobuf] Ensure bulk persistence is not supported --- flink-formats/flink-protobuf/pom.xml | 10 +++ .../formats/protobuf/PbFileFormatFactory.java | 83 ++ .../org.apache.flink.table.factories.Factory | 1 + .../formats/protobuf/ProtobufSQLITCaseTest.java| 32 + 4 files changed, 126 insertions(+) diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml index 00e04bc385e..760d32f230a 100644 --- a/flink-formats/flink-protobuf/pom.xml +++ b/flink-formats/flink-protobuf/pom.xml @@ -56,6 +56,16 @@ under the License. provided + + + + org.apache.flink + flink-connector-files + ${project.version} + provided + true + + com.google.protobuf protobuf-java diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java new file mode 100644 index 000..fc1cecf3c48 --- /dev/null +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.BulkWriter.Factory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; +import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory; +import org.apache.flink.connector.file.table.format.BulkDecodingFormat; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; + +import java.util.Collections; +import java.util.Set; + +/** + * Throw a {@link ValidationException} when using Protobuf format factory for file system. + * + * In practice, there is https://protobuf.dev/programming-guides/techniques/#streaming";> + * no standard for storing bulk protobuf messages. This factory is present to prevent falling + * back to the {@link org.apache.flink.connector.file.table.DeserializationSchemaAdapter}, a + * line-based format which could silently succeed but write unrecoverable data to disk. + * + * If your use case requires storing bulk protobuf messages on disk, the parquet file format + * might be the appropriate container and has an API for mapping records to protobuf messages. + */ +@Internal +public class PbFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory { + +@Override +public String factoryIdentifier() { +return PbFormatFactory.IDENTIFIER; +} + +@Override +public Set> requiredOptions() { +return Collections.emptySet(); +} + +@Override +public Set> optionalOptions() { +return Collections.emptySet(); +} + +@Override +public Set> forwardOptions() { +return Collections.emptySet(); +} + +@Override +public BulkDecodingFormat createDecodingFormat( +DynamicTableFactory.Context context, ReadableConfig formatOptions) { +throw new ValidationException( +"The 'protob
[flink] branch master updated (c0dab74002b -> a521f5a98a7)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from c0dab74002b [FLINK-32290][dist] Enable -XX:+IgnoreUnrecognizedVMOptions add a521f5a98a7 [FLINK-32343][jdbc-driver] Fix exception in jdbc driver for jdbc tools No new revisions were added by this update. Summary of changes: .../apache/flink/table/jdbc/BaseConnection.java| 31 .../org/apache/flink/table/jdbc/BaseStatement.java | 13 - .../apache/flink/table/jdbc/FlinkConnection.java | 33 ++ .../apache/flink/table/jdbc/FlinkStatement.java| 13 + 4 files changed, 46 insertions(+), 44 deletions(-)
[flink] branch master updated: [FLINK-31673][jdbc-driver] Add e2e test for jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 089e9edcb6f [FLINK-31673][jdbc-driver] Add e2e test for jdbc driver 089e9edcb6f is described below commit 089e9edcb6f250fbd18ae4cb0f285d47c629deb4 Author: Shammon FY AuthorDate: Wed Jun 14 18:19:55 2023 +0800 [FLINK-31673][jdbc-driver] Add e2e test for jdbc driver --- .../flink-end-to-end-tests-jdbc-driver/pom.xml | 74 ++ .../jdbc/driver/tests/FlinkDriverExample.java | 111 + flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh| 1 + flink-end-to-end-tests/test-scripts/common.sh | 14 +++ .../test-scripts/test_sql_jdbc_driver.sh | 44 .../org/apache/flink/table/gateway/SqlGateway.java | 2 - 7 files changed, 245 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml new file mode 100644 index 000..47b4eb3f43b --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/pom.xml @@ -0,0 +1,74 @@ + + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + 4.0.0 + + + flink-end-to-end-tests + org.apache.flink + 1.18-SNAPSHOT + + + flink-end-to-end-tests-jdbc-driver + Flink : E2E Tests : JDBC Driver + + jar + + + + org.apache.flink + flink-sql-jdbc-driver-bundle + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-jdbc-driver + package + + shade + + + JdbcDriverExample + + + com.google.code.findbugs:jsr305 + + + org.apache.flink:flink-sql-jdbc-driver-bundle + org.slf4j:slf4j-api + + + + + + + + + diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java new file mode 100644 index 000..4feb88a1e74 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/driver/tests/FlinkDriverExample.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc.driver.tests; + +import java.io.Buf
[flink] branch master updated (a6adbdda0cd -> c7afa323582)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from a6adbdda0cd [FLINK-31835][table-planner] Fix the array type that can't be converted from the external primitive array add c7afa323582 [FLINK-32300][jdbc-driver] Support getObject for FlinkResultSet No new revisions were added by this update. Summary of changes: flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 18 ++- .../org/apache/flink/table/jdbc/BaseStatement.java | 5 - .../apache/flink/table/jdbc/FlinkResultSet.java| 87 +- .../apache/flink/table/jdbc/FlinkStatement.java| 14 +++ .../flink/table/jdbc/utils/ArrayFieldGetter.java | 121 +++ .../flink/table/jdbc/FlinkResultSetTest.java | 129 - .../flink/table/jdbc/FlinkStatementTest.java | 7 ++ 7 files changed, 318 insertions(+), 63 deletions(-) create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java
[flink-connector-jdbc] branch main updated: [FLINK-32235][docs] Translate CrateDB Docs to chinese
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git The following commit(s) were added to refs/heads/main by this push: new 1a2186c [FLINK-32235][docs] Translate CrateDB Docs to chinese 1a2186c is described below commit 1a2186c71fb765b85cc8b9371e6b5158b9ef10b4 Author: codenohup AuthorDate: Tue Jun 6 17:33:55 2023 +0800 [FLINK-32235][docs] Translate CrateDB Docs to chinese Signed-off-by: codenohup --- docs/content.zh/docs/connectors/table/jdbc.md | 36 ++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md index 063e0a3..ad2f1da 100644 --- a/docs/content.zh/docs/connectors/table/jdbc.md +++ b/docs/content.zh/docs/connectors/table/jdbc.md @@ -55,7 +55,7 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref " | PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download/) | | Derby | `org.apache.derby` | `derby`| [下载](http://db.apache.org/derby/derby_downloads.html) | | | SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | -| CrateDB| `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | +| CrateDB| `io.crate` | `crate-jdbc` | [下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | 当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解在集群上执行时何连接它们。 @@ -605,7 +605,41 @@ SELECT * FROM test_table; SELECT * FROM mysql_catalog.given_database.test_table2; SELECT * FROM given_database.test_table2; ``` + +### JDBC Catalog for CrateDB + + + + CrateDB 元空间映射 + +CrateDB 和 PostgreSQL 类似,但它只有一个默认名为 `crate` 的数据库。 此外它有一个额外的命名空间 `schema`,一个 CrateDB 实例可以有多个 schema,其中一个 schema 默认名为"doc",每个 schema 可以包含多张表。 在 Flink 中,当查询由 CrateDB catalog 注册的表时,用户可以使用 `schema_name.table_name` 或者只有 `table_name`。其中 `schema_name` 是可选的,默认值为 "doc"。 + +因此,Flink Catalog 和 CrateDB catalog 之间的元空间映射如下: + +| Flink Catalog Metaspace Structure| CrateDB Metaspace Structure| +| :|:---| +| catalog name (defined in Flink only) | N/A| +| database name| database name (一直是 `crate`) | +| table name | [schema_name.]table_name | + +Flink 中的 CrateDB 表的完整路径应该是 ``"..``"``。如果指定了 schema,请注意转义 ``。 + +这里提供了一些访问 CrateDB 表的例子: + +```sql +-- 扫描 'doc' schema (即默认 schema)中的 'test_table' 表,schema 名称可以省略 +SELECT * FROM mycatalog.crate.doc.test_table; +SELECT * FROM crate.doc.test_table; +SELECT * FROM doc.test_table; +SELECT * FROM test_table; + +-- 扫描 'custom_schema' schema 中的 'test_table2' 表 +-- 自定义 schema 不能省略,并且必须与表一起转义 +SELECT * FROM mycatalog.crate.`custom_schema.test_table2` +SELECT * FROM crate.`custom_schema.test_table2`; +SELECT * FROM `custom_schema.test_table2`; +``` 数据类型映射
[flink] branch master updated: [FLINK-32211][sql-client] Supports row format in executor
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 81902e81c43 [FLINK-32211][sql-client] Supports row format in executor 81902e81c43 is described below commit 81902e81c43990c48d05ec81469b7d99c7922698 Author: Shammon FY AuthorDate: Mon May 29 15:01:29 2023 +0800 [FLINK-32211][sql-client] Supports row format in executor In this commit, we also removed DataConverter in flink-sql-jdbc-driver module. Close apache/flink#22671 --- .../flink/table/client/gateway/Executor.java | 9 ++ .../flink/table/client/gateway/ExecutorImpl.java | 37 +++- .../apache/flink/table/jdbc/FlinkConnection.java | 4 +- .../apache/flink/table/jdbc/FlinkResultSet.java| 50 +- .../apache/flink/table/jdbc/FlinkStatement.java| 5 +- .../flink/table/jdbc/utils/DataConverter.java | 88 - .../table/jdbc/utils/DatabaseMetaDataUtils.java| 6 +- .../table/jdbc/utils/DefaultDataConverter.java | 105 - .../table/jdbc/utils/StringDataConverter.java | 105 - .../flink/table/jdbc/FlinkResultSetTest.java | 47 - 10 files changed, 89 insertions(+), 367 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index b636d326560..3be128e0708 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client.gateway; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.service.context.DefaultContext; import java.io.Closeable; @@ -36,6 +37,14 @@ public interface Executor extends Closeable { return new ExecutorImpl(defaultContext, address, sessionId); } +static Executor create( +DefaultContext defaultContext, +InetSocketAddress address, +String sessionId, +RowFormat rowFormat) { +return new ExecutorImpl(defaultContext, address, sessionId, rowFormat); +} + static Executor create(DefaultContext defaultContext, URL address, String sessionId) { return new ExecutorImpl(defaultContext, address, sessionId); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index 4ec7eb3e8eb..f4f6b40ed56 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -112,6 +112,7 @@ public class ExecutorImpl implements Executor { private final SqlGatewayRestAPIVersion connectionVersion; private final SessionHandle sessionHandle; +private final RowFormat rowFormat; public ExecutorImpl( DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) { @@ -119,11 +120,30 @@ public class ExecutorImpl implements Executor { defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, -HEARTBEAT_INTERVAL_MILLISECONDS); +HEARTBEAT_INTERVAL_MILLISECONDS, +RowFormat.PLAIN_TEXT); +} + +public ExecutorImpl( +DefaultContext defaultContext, +InetSocketAddress gatewayAddress, +String sessionId, +RowFormat rowFormat) { +this( +defaultContext, +NetUtils.socketToUrl(gatewayAddress), +sessionId, +HEARTBEAT_INTERVAL_MILLISECONDS, +rowFormat); } public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String sessionId) { -this(defaultContext, gatewayUrl, sessionId, HEARTBEAT_INTERVAL_MILLISECONDS); +this( +defaultContext, +gatewayUrl, +sessionId, +HEARTBEAT_INTERVAL_MILLISECONDS, +RowFormat.PLAIN_TEXT); } @VisibleForTesting @@ -132,7 +152,12 @@ public class ExecutorImpl implements Executor { InetSocketAddress gatewayAddress, String sessionId, long heartbeatInterval) { -this(defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId, heartbeatInterval); +this
[flink] branch master updated: [FLINK-32129][fs-connector] Use string array in PartitionCommitInfo
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3cbacbf26f0 [FLINK-32129][fs-connector] Use string array in PartitionCommitInfo 3cbacbf26f0 is described below commit 3cbacbf26f09b5301b280beed4f78fc03d573d76 Author: Shammon FY AuthorDate: Fri May 19 15:31:35 2023 +0800 [FLINK-32129][fs-connector] Use string array in PartitionCommitInfo Using generics will throw exception when 'pipeline.generic-types' is disabled Close apache/flink#22609 --- .../file/table/stream/PartitionCommitInfo.java | 9 +++--- .../file/table/stream/StreamingFileWriter.java | 3 +- .../file/table/stream/compact/CompactOperator.java | 2 +- .../file/table/stream/PartitionCommitInfoTest.java | 37 ++ .../file/table/stream/StreamingFileWriterTest.java | 3 +- .../table/stream/compact/CompactOperatorTest.java | 2 +- 6 files changed, 46 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java index abc6ede1d51..5decc717c8e 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java @@ -21,7 +21,6 @@ package org.apache.flink.connector.file.table.stream; import org.apache.flink.annotation.Internal; import java.io.Serializable; -import java.util.List; /** * The message sent by upstream. @@ -37,12 +36,12 @@ public class PartitionCommitInfo implements Serializable { private long checkpointId; private int taskId; private int numberOfTasks; -private List partitions; +private String[] partitions; public PartitionCommitInfo() {} public PartitionCommitInfo( -long checkpointId, int taskId, int numberOfTasks, List partitions) { +long checkpointId, int taskId, int numberOfTasks, String[] partitions) { this.checkpointId = checkpointId; this.taskId = taskId; this.numberOfTasks = numberOfTasks; @@ -73,11 +72,11 @@ public class PartitionCommitInfo implements Serializable { this.numberOfTasks = numberOfTasks; } -public List getPartitions() { +public String[] getPartitions() { return partitions; } -public void setPartitions(List partitions) { +public void setPartitions(String[] partitions) { this.partitions = partitions; } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java index eec6907bce3..7eb9faad119 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -152,6 +151,6 @@ public class StreamingFileWriter extends AbstractStreamingWriter(partitions; +partitions.toArray(new String[0]; } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java index a99f39b57a5..fa51d4aa767 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java @@ -158,7 +158,7 @@ public class CompactOperator extends AbstractStreamOperator(this.partitions; +this.partitions.toArray(new String[0]; this.partitions.clear(); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink
[flink] branch master updated: [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 06688f345f6 [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver 06688f345f6 is described below commit 06688f345f6793a8964ec2175f44cda13c33 Author: Shammon FY AuthorDate: Sat May 6 09:51:44 2023 +0800 [FLINK-31687][jdbc-driver] Get rid of flink-core for jdbc driver Close apache/flink#22533 --- .../client/cli/parser/SqlClientSyntaxHighlighter.java | 8 .../apache/flink/table/client/gateway/Executor.java | 8 .../flink/table/client/gateway/ExecutorImpl.java | 11 ++- .../apache/flink/table/client/cli/CliClientTest.java | 9 - .../table/gateway/service/context/DefaultContext.java | 5 + flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 9 - .../org/apache/flink/table/jdbc/FlinkConnection.java | 19 ++- 7 files changed, 41 insertions(+), 28 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java index bdcfa5a707d..366cae3f57e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlClientSyntaxHighlighter.java @@ -18,6 +18,7 @@ package org.apache.flink.table.client.cli.parser; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.config.TableConfigOptions; @@ -57,16 +58,15 @@ public class SqlClientSyntaxHighlighter extends DefaultHighlighter { @Override public AttributedString highlight(LineReader reader, String buffer) { +ReadableConfig configuration = executor.getSessionConfig(); final SyntaxHighlightStyle.BuiltInStyle style = SyntaxHighlightStyle.BuiltInStyle.fromString( -executor.getSessionConfig() - .get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA)); + configuration.get(SqlClientOptions.DISPLAY_DEFAULT_COLOR_SCHEMA)); if (style == SyntaxHighlightStyle.BuiltInStyle.DEFAULT) { return super.highlight(reader, buffer); } -final String dialectName = - executor.getSessionConfig().get(TableConfigOptions.TABLE_SQL_DIALECT); +final String dialectName = configuration.get(TableConfigOptions.TABLE_SQL_DIALECT); final SqlDialect dialect = SqlDialect.HIVE.name().equalsIgnoreCase(dialectName) ? SqlDialect.HIVE diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index 8414789c993..b636d326560 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -25,6 +25,7 @@ import java.io.Closeable; import java.net.InetSocketAddress; import java.net.URL; import java.util.List; +import java.util.Map; /** A gateway for communicating with Flink and other external systems. */ public interface Executor extends Closeable { @@ -53,6 +54,13 @@ public interface Executor extends Closeable { */ ReadableConfig getSessionConfig(); +/** + * Get the map configuration of the session. + * + * @return the map session configuration. + */ +Map getSessionConfigMap(); + /** * Execute statement. * diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index fcdee3c779d..4ec7eb3e8eb 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -83,6 +83,7 @@ import java.net.URL; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -206,6 +207,14 @@ public class ExecutorImpl implements Executor { } public ReadableConfig getSessionConfig() { +try
[flink] branch master updated: [FLINK-31548][jdbc-driver] Introduce FlinkDataSource for flink jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 420b5e319e1 [FLINK-31548][jdbc-driver] Introduce FlinkDataSource for flink jdbc driver 420b5e319e1 is described below commit 420b5e319e1f8e917c099530360f51f73a070575 Author: Shammon FY AuthorDate: Sat May 6 09:00:38 2023 +0800 [FLINK-31548][jdbc-driver] Introduce FlinkDataSource for flink jdbc driver Close apache/flink#22532 --- .../apache/flink/table/jdbc/FlinkDataSource.java | 88 ++ .../flink/table/jdbc/FlinkDataSourceTest.java | 39 ++ 2 files changed, 127 insertions(+) diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java new file mode 100644 index 000..d8a84e19742 --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDataSource.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import javax.sql.DataSource; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + +/** Basic flink data source which create {@link FlinkConnection}. */ +public class FlinkDataSource implements DataSource { +private final String url; +private final Properties properties; + +public FlinkDataSource(String url, Properties properties) { +this.url = url; +this.properties = properties; +} + +@Override +public Connection getConnection() throws SQLException { +return new FlinkConnection(DriverUri.create(url, properties)); +} + +@Override +public Connection getConnection(String username, String password) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDataSource#getConnection with username and password is not supported"); +} + +@Override +public T unwrap(Class iface) throws SQLException { +throw new SQLFeatureNotSupportedException("FlinkDataSource#unwrap is not supported"); +} + +@Override +public boolean isWrapperFor(Class iface) throws SQLException { +throw new SQLFeatureNotSupportedException("FlinkDataSource#isWrapperFor is not supported"); +} + +@Override +public PrintWriter getLogWriter() throws SQLException { +throw new SQLFeatureNotSupportedException("FlinkDataSource#getLogWriter is not supported"); +} + +@Override +public void setLogWriter(PrintWriter out) throws SQLException { +throw new SQLFeatureNotSupportedException("FlinkDataSource#setLogWriter is not supported"); +} + +@Override +public void setLoginTimeout(int seconds) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDataSource#setLoginTimeout is not supported"); +} + +@Override +public int getLoginTimeout() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDataSource#getLoginTimeout is not supported"); +} + +@Override +public Logger getParentLogger() throws SQLFeatureNotSupportedException { +throw new SQLFeatureNotSupportedException( +"FlinkDataSource#getParentLogger is not supported"); +} +} diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java new file mode 100644 index 000..9d00c37d24d --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDataSourceTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache
[flink] branch master updated (ed9ee279e50 -> 400530d30af)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from ed9ee279e50 [FLINK-30815][tests] Migrate BatchTestBase to junit5 add 400530d30af [FLINK-31546][jdbc-driver] Close all statements when connection is closed No new revisions were added by this update. Summary of changes: .../apache/flink/table/jdbc/FlinkConnection.java | 48 -- .../apache/flink/table/jdbc/FlinkStatement.java| 5 ++- .../flink/table/jdbc/FlinkStatementTest.java | 15 +++ 3 files changed, 63 insertions(+), 5 deletions(-)
[flink] branch master updated: [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver 2ff4e220945 is described below commit 2ff4e220945680eba21065480385148ca8d034da Author: Shammon FY AuthorDate: Thu Apr 6 13:09:19 2023 +0800 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver Close apache/flink#22362 --- .../flink/table/jdbc/BaseDatabaseMetaData.java | 792 + .../org/apache/flink/table/jdbc/DriverUri.java | 4 + .../apache/flink/table/jdbc/FlinkConnection.java | 15 +- .../flink/table/jdbc/FlinkDatabaseMetaData.java| 384 ++ .../apache/flink/table/jdbc/FlinkResultSet.java| 29 +- .../flink/table/jdbc/FlinkResultSetMetaData.java | 10 +- .../table/jdbc/utils/CloseableResultIterator.java | 24 + .../table/jdbc/utils/CollectionResultIterator.java | 45 ++ .../table/jdbc/utils/DatabaseMetaDataUtils.java| 110 +++ .../table/jdbc/utils/StatementResultIterator.java | 46 ++ .../flink/table/jdbc/FlinkConnectionTest.java | 5 +- .../table/jdbc/FlinkDatabaseMetaDataTest.java | 121 12 files changed, 1567 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java new file mode 100644 index 000..75ec33aab67 --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.RowIdLifetime; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +/** Base {@link DatabaseMetaData} for flink driver with not supported features. */ +public abstract class BaseDatabaseMetaData implements DatabaseMetaData { +@Override +public String getUserName() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#getUserName is not supported"); +} + +@Override +public boolean nullsAreSortedHigh() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedHigh is not supported"); +} + +@Override +public boolean nullsAreSortedLow() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedLow is not supported"); +} + +@Override +public boolean nullsAreSortedAtStart() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedAtStart is not supported"); +} + +@Override +public boolean nullsAreSortedAtEnd() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported"); +} + +@Override +public boolean supportsMixedCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not supported"); +} + +@Override +public boolean storesUpperCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not supported"); +} + +@Override +public boolean storesLowerCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not supported"); +} + +@Override +public boolean storesMixedCaseIdentifiers() throws SQLExc
[flink] branch master updated: [FLINK-31543][jdbc-driver] Introduce statement for flink jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 63166226f65 [FLINK-31543][jdbc-driver] Introduce statement for flink jdbc driver 63166226f65 is described below commit 63166226f65cc4da80cc7661d76eb90032ec Author: Shammon FY AuthorDate: Tue Apr 18 19:13:24 2023 +0800 [FLINK-31543][jdbc-driver] Introduce statement for flink jdbc driver Close apache/flink#22417 --- flink-table/flink-sql-jdbc-driver/pom.xml | 12 ++ .../org/apache/flink/table/jdbc/BaseStatement.java | 238 + .../apache/flink/table/jdbc/FlinkConnection.java | 2 +- .../apache/flink/table/jdbc/FlinkResultSet.java| 5 +- .../flink/table/jdbc/FlinkResultSetMetaData.java | 19 +- .../apache/flink/table/jdbc/FlinkStatement.java| 157 ++ .../flink/table/jdbc/FlinkConnectionTest.java | 48 + .../flink/table/jdbc/FlinkJdbcDriverTestBase.java | 66 ++ .../table/jdbc/FlinkResultSetMetaDataTest.java | 7 +- .../flink/table/jdbc/FlinkStatementTest.java | 123 +++ 10 files changed, 619 insertions(+), 58 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml b/flink-table/flink-sql-jdbc-driver/pom.xml index 3d6968fa8e4..fc6b7b285c8 100644 --- a/flink-table/flink-sql-jdbc-driver/pom.xml +++ b/flink-table/flink-sql-jdbc-driver/pom.xml @@ -72,6 +72,18 @@ ${project.version} test + + org.apache.flink + flink-connector-files + ${project.version} + test + + + org.apache.flink + flink-csv + ${project.version} + test + diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java new file mode 100644 index 000..08bf3107f28 --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseStatement.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.Statement; + +/** Base statement in flink driver. */ +public abstract class BaseStatement implements Statement { +@Override +public int executeUpdate(String sql) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#executeUpdate is not supported yet."); +} + +@Override +public int getMaxFieldSize() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#getMaxFieldSize is not supported yet."); +} + +@Override +public void setMaxFieldSize(int max) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#setMaxFieldSize is not supported yet."); +} + +@Override +public int getMaxRows() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#getMaxRows is not supported yet."); +} + +@Override +public void setMaxRows(int max) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#setMaxRows is not supported yet."); +} + +@Override +public void setEscapeProcessing(boolean enable) throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkStatement#setEscapeProcessing is not supported yet."); +} + +@Override +public int getQueryTimeout() throws SQLException { +throw new S
[flink] branch master updated: [FLINK-31787][docs] Add explicit ROW constructor to sql functions doc
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fc7d821dfa1 [FLINK-31787][docs] Add explicit ROW constructor to sql functions doc fc7d821dfa1 is described below commit fc7d821dfa16bad8ca58cb716b1b429d55b8317a Author: Aitozi AuthorDate: Thu Apr 13 10:06:24 2023 +0800 [FLINK-31787][docs] Add explicit ROW constructor to sql functions doc Close apache/flink#22386 --- docs/data/sql_functions.yml | 9 +++-- docs/data/sql_functions_zh.yml| 11 --- docs/layouts/shortcodes/sql_functions.html| 2 +- docs/layouts/shortcodes/sql_functions_zh.html | 2 +- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 471b3bcfea1..1aa2ca0872e 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -931,13 +931,18 @@ json: valueconstruction: - sql: | - -- implicit constructor with parenthesis + **implicit** constructor with parenthesis + (value1 [, value2]*) + + **explicit** ROW constructor with + + ROW(value1 [, value2]*) table: row(ANY1, ANY2, ...) description: | Returns a row created from a list of values (value1, value2,...). - The implicit row constructor supports arbitrary expressions as fields but requires at least two fields. The explicit row constructor can deal with an arbitrary number of fields but does not support all kinds of field expressions well currently. + The implicit row constructor requires at least two fields. The explicit row constructor can deal with an arbitrary number of fields. Both of them support arbitrary expressions as fields. - sql: ARRAY ‘[’ value1 [, value2 ]* ‘]’ table: array(ANY1, ANY2, ...) description: Returns an array created from a list of values (value1, value2, ...). diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index c3c24d555b2..c0e7a1995ef 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -985,12 +985,17 @@ json: valueconstruction: - sql: | - -- implicit constructor with parenthesis + **implicit** constructor with parenthesis + (value1 [, value2]*) + + **explicit** ROW constructor with + + ROW(value1 [, value2]*) table: row(ANY1, ANY2, ...) description: | - 返回从值列表 (value1, value2, ...) 创建的行。隐式行构造函数支持任意表达式作为字段,但至少需要两个字段。 - 显式行构造函数可以处理任意数量的字段,但目前还不能很好地支持所有类型的字段表达式。 + 返回从值列表 (value1, value2, ...) 创建的行。隐式行构造函数至少需要两个字段。显式行构造函数可以处理任意数量的字段。 + 两者都支持任意表达式作为字段 - sql: ARRAY ‘[’ value1 [, value2 ]* ‘]’ table: array(ANY1, ANY2, ...) description: 返回从值列表 (value1, value2, ...) 创建的数组。 diff --git a/docs/layouts/shortcodes/sql_functions.html b/docs/layouts/shortcodes/sql_functions.html index ec8667ee1c7..a8e3557778c 100644 --- a/docs/layouts/shortcodes/sql_functions.html +++ b/docs/layouts/shortcodes/sql_functions.html @@ -35,7 +35,7 @@ under the License. {{ range $category }} -{{ .sql | default "N/A" }} +{{ .sql | $.Page.RenderString | default "N/A" }} {{ .table | default "N/A" }} diff --git a/docs/layouts/shortcodes/sql_functions_zh.html b/docs/layouts/shortcodes/sql_functions_zh.html index 4047517c30c..949fa00d363 100644 --- a/docs/layouts/shortcodes/sql_functions_zh.html +++ b/docs/layouts/shortcodes/sql_functions_zh.html @@ -35,7 +35,7 @@ under the License. {{ range $category }} -{{ .sql | default "不适用" }} +{{ .sql | $.Page.RenderString | default "不适用" }} {{ .table | default "不适用" }}
[flink] branch master updated: [FLINK-31741][jdbc-driver] Support data converter for value in statement result
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9538fdaab29 [FLINK-31741][jdbc-driver] Support data converter for value in statement result 9538fdaab29 is described below commit 9538fdaab2948a2e3dd068925d936ac0777301de Author: shammon FY AuthorDate: Thu Apr 6 11:34:12 2023 +0800 [FLINK-31741][jdbc-driver] Support data converter for value in statement result Close apache/flink#22360 --- .../apache/flink/table/jdbc/FlinkResultSet.java| 35 +-- .../flink/table/jdbc/utils/DataConverter.java | 88 .../table/jdbc/utils/DefaultDataConverter.java | 105 + .../table/jdbc/utils/StringDataConverter.java | 105 + .../flink/table/jdbc/FlinkResultSetTest.java | 238 + 5 files changed, 463 insertions(+), 108 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java index cfb7ebc3f09..6c8072b60ba 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java @@ -21,7 +21,7 @@ package org.apache.flink.table.jdbc; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.client.gateway.StatementResult; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; +import org.apache.flink.table.jdbc.utils.DataConverter; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.DecimalType; @@ -50,14 +50,17 @@ public class FlinkResultSet extends BaseResultSet { private final List columnNameList; private final Statement statement; private final StatementResult result; +private final DataConverter dataConverter; private RowData currentRow; private boolean wasNull; private volatile boolean closed; -public FlinkResultSet(Statement statement, StatementResult result) { +public FlinkResultSet( +Statement statement, StatementResult result, DataConverter dataConverter) { this.statement = checkNotNull(statement, "Statement cannot be null"); this.result = checkNotNull(result, "Statement result cannot be null"); +this.dataConverter = checkNotNull(dataConverter, "Data converter cannot be null"); this.currentRow = null; this.wasNull = false; @@ -133,9 +136,8 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); -StringData stringData = currentRow.getString(columnIndex - 1); try { -return stringData == null ? null : stringData.toString(); +return dataConverter.getString(currentRow, columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -147,8 +149,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { -return currentRow.getBoolean(columnIndex - 1); - +return dataConverter.getBoolean(currentRow, columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -160,7 +161,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { -return currentRow.getByte(columnIndex - 1); +return dataConverter.getByte(currentRow, columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -172,7 +173,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { -return currentRow.getShort(columnIndex - 1); +return dataConverter.getShort(currentRow, columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -184,7 +185,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidRow(); checkValidColumn(columnIndex); try { -return currentRow.getInt(columnIndex - 1); +return dataConverter.getInt(currentRow, columnIndex - 1); } catch (Exception e) { throw new SQLDataException(e); } @@ -197,7 +198,7 @@ public class FlinkResultSet extends BaseResultSet { checkValidColumn(columnIndex); try { -return currentRow.getLong(columnIndex - 1); +return dataConverter.getLong(currentRow, columnIndex - 1);
[flink-connector-jdbc] branch hotfix/update_copyright_year_to_2023 created (now 4ad108d)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch hotfix/update_copyright_year_to_2023 in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git at 4ad108d [hotfix] Update the copyright year to 2012-2023 in NOTICE This branch includes the following new commits: new 4ad108d [hotfix] Update the copyright year to 2012-2023 in NOTICE The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink-connector-jdbc] 01/01: [hotfix] Update the copyright year to 2012-2023 in NOTICE
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch hotfix/update_copyright_year_to_2023 in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 4ad108db42527f646d86a442f1585db2727e5541 Author: Benchao Li AuthorDate: Sun Apr 16 17:53:28 2023 +0800 [hotfix] Update the copyright year to 2012-2023 in NOTICE --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index 1553115..0161a58 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache Flink JDBC Connector -Copyright 2014-2022 The Apache Software Foundation +Copyright 2014-2023 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
[flink] branch master updated: [FLINK-31808][docs] Fix wrong examples of how to set operator name in documents
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 97dee4bd2ad [FLINK-31808][docs] Fix wrong examples of how to set operator name in documents 97dee4bd2ad is described below commit 97dee4bd2ade278805241a245385df3ceeb90150 Author: Weihua Hu AuthorDate: Fri Apr 14 17:31:07 2023 +0800 [FLINK-31808][docs] Fix wrong examples of how to set operator name in documents Close apache/flink#22401 --- docs/content.zh/docs/dev/datastream/operators/overview.md | 4 ++-- docs/content/docs/dev/datastream/operators/overview.md| 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md b/docs/content.zh/docs/dev/datastream/operators/overview.md index fbccbbdbc10..a8c1b901d46 100644 --- a/docs/content.zh/docs/dev/datastream/operators/overview.md +++ b/docs/content.zh/docs/dev/datastream/operators/overview.md @@ -851,12 +851,12 @@ Flink里的算子和作业节点会有一个名字和一个描述。名字和描 {{< tabs namedescription>}} {{< tab "Java" >}} ```java -someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1"); +someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1"); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1") +someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1") ``` {{< /tab >}} {{< tab "Python" >}} diff --git a/docs/content/docs/dev/datastream/operators/overview.md b/docs/content/docs/dev/datastream/operators/overview.md index d471f485fe5..5e04da86aea 100644 --- a/docs/content/docs/dev/datastream/operators/overview.md +++ b/docs/content/docs/dev/datastream/operators/overview.md @@ -855,12 +855,12 @@ The description can contain detail information about operators to facilitate deb {{< tabs namedescription >}} {{< tab "Java" >}} ```java -someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1"); +someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1"); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -someStream.filter(...).setName("filter").setDescription("x in (1, 2, 3, 4) and y > 1") +someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1") ``` {{< /tab >}} {{< tab "Python" >}}
[flink] branch master updated: [FLINK-31545][jdbc-driver] Create executor in flink connection
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8c44d58c4c4 [FLINK-31545][jdbc-driver] Create executor in flink connection 8c44d58c4c4 is described below commit 8c44d58c4c4aeb36c91d8b27f4128891970dc47d Author: shammon FY AuthorDate: Wed Mar 29 10:47:11 2023 +0800 [FLINK-31545][jdbc-driver] Create executor in flink connection Close apache/flink#22289 --- flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 28 + flink-table/flink-sql-jdbc-driver/pom.xml | 23 +++- .../org/apache/flink/table/jdbc/DriverUri.java | 15 +-- .../apache/flink/table/jdbc/FlinkConnection.java | 96 +-- .../flink/table/jdbc/FlinkConnectionTest.java | 136 + .../apache/flink/table/jdbc/FlinkDriverTest.java | 8 +- 6 files changed, 279 insertions(+), 27 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml index 9ad99ffee9f..9d43bdb9be4 100644 --- a/flink-table/flink-sql-jdbc-driver-bundle/pom.xml +++ b/flink-table/flink-sql-jdbc-driver-bundle/pom.xml @@ -50,15 +50,40 @@ flink-sql-gateway-api ${project.version} + + org.apache.flink + flink-sql-gateway + ${project.version} + org.apache.flink flink-table-common ${project.version} + + org.apache.flink + flink-annotations + ${project.version} + + + + + org.apache.flink + flink-core + ${project.version} + + + io.github.zentol.japicmp + japicmp-maven-plugin + + + true + + org.apache.maven.plugins @@ -77,7 +102,10 @@ org.apache.flink:flink-sql-jdbc-driver org.apache.flink:flink-sql-client org.apache.flink:flink-sql-gateway-api + org.apache.flink:flink-sql-gateway org.apache.flink:flink-table-common + org.apache.flink:flink-annotations + org.apache.flink:flink-core diff --git a/flink-table/flink-sql-jdbc-driver/pom.xml b/flink-table/flink-sql-jdbc-driver/pom.xml index 267bca001ac..3d6968fa8e4 100644 --- a/flink-table/flink-sql-jdbc-driver/pom.xml +++ b/flink-table/flink-sql-jdbc-driver/pom.xml @@ -48,9 +48,30 @@ org.apache.flink - flink-table-api-java-bridge + flink-sql-gateway ${project.version} + + + + org.apache.flink + flink-sql-gateway + ${project.version} + test-jar + test + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${project.version} + test + diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java index bb7952a2417..a13650b45e7 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java +++ b/flink-table/flink-sql-jdbc
[flink] branch master updated (cfb213040a0 -> b81291864e6)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from cfb213040a0 [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner (#22263) add b81291864e6 [FLINK-31547][jdbc-driver] Introduce FlinkResultSetMetaData for jdbc driver No new revisions were added by this update. Summary of changes: flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 6 + .../org/apache/flink/table/jdbc/ColumnInfo.java| 287 + .../flink/table/jdbc/FlinkResultSetMetaData.java | 257 ++ .../table/jdbc/FlinkResultSetMetaDataTest.java | 124 + 4 files changed, 674 insertions(+) create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/ColumnInfo.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSetMetaData.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetMetaDataTest.java
[flink] branch master updated (0dd9bcb0a9c -> 3814fe8ddf5)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 0dd9bcb0a9c [FLINK-31399] AdaptiveScheduler should free excessive slots that are no longer needed after down scaling. add 3814fe8ddf5 [FLINK-31522][jdbc-driver] Introduce FlinkResultSet for jdbc driver No new revisions were added by this update. Summary of changes: flink-table/flink-sql-jdbc-driver-bundle/pom.xml | 6 + flink-table/flink-sql-jdbc-driver/pom.xml | 5 + .../org/apache/flink/table/jdbc/BaseResultSet.java | 821 + .../apache/flink/table/jdbc/FlinkResultSet.java| 445 +++ .../flink/table/jdbc/FlinkResultSetTest.java | 176 + .../apache/flink/table/jdbc/TestingStatement.java | 220 ++ 6 files changed, 1673 insertions(+) create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseResultSet.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/TestingStatement.java
[flink] branch master updated (de27786eba5 -> f14a02ecac3)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from de27786eba5 [FLINK-31541][ui] encode request params to support special characters in metrics name. add f14a02ecac3 [FLINK-31538][jdbc-driver] Support parsing catalog/database and properties for uri No new revisions were added by this update. Summary of changes: flink-table/flink-sql-jdbc-driver/pom.xml | 19 ++ .../apache/flink/table/jdbc/BaseConnection.java| 297 + .../org/apache/flink/table/jdbc/DriverInfo.java| 86 ++ .../org/apache/flink/table/jdbc/DriverUri.java | 203 ++ .../apache/flink/table/jdbc/FlinkConnection.java | 95 +++ .../org/apache/flink/table/jdbc/FlinkDriver.java | 32 ++- .../apache/flink/table/jdbc/utils/DriverUtils.java | 102 +++ .../org/apache/flink/table/jdbc/driver.properties | 3 +- .../apache/flink/table/jdbc/FlinkDriverTest.java | 103 +++ 9 files changed, 931 insertions(+), 9 deletions(-) create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseConnection.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverInfo.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DriverUtils.java copy flink-clients/src/main/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory => flink-table/flink-sql-jdbc-driver/src/main/resources/org/apache/flink/table/jdbc/driver.properties (91%) create mode 100644 flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java
[flink] branch master updated (1d33773e6b7 -> 25798e8b2ed)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 1d33773e6b7 [FLINK-25509][connector-base] Add RecordEvaluator to dynamically stop source based on de-serialized records add 25798e8b2ed [FLINK-31521][jdbc-driver] Initialize jdbc driver modules in flink-table No new revisions were added by this update. Summary of changes: .../pom.xml| 54 --- .../flink-sql-jdbc-driver}/pom.xml | 46 +++- .../org/apache/flink/table/jdbc/FlinkDriver.java | 81 ++ .../resources/META-INF/services/java.sql.Driver| 2 +- flink-table/pom.xml| 2 + 5 files changed, 108 insertions(+), 77 deletions(-) copy flink-table/{flink-table-planner-loader-bundle => flink-sql-jdbc-driver-bundle}/pom.xml (60%) copy {flink-connectors/flink-connector-datagen => flink-table/flink-sql-jdbc-driver}/pom.xml (59%) create mode 100644 flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkDriver.java copy flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory => flink-table/flink-sql-jdbc-driver/src/main/resources/META-INF/services/java.sql.Driver (95%)
[flink-connector-jdbc] branch main updated: [FLINK-31181] Support LIKE operator pushdown
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git The following commit(s) were added to refs/heads/main by this push: new aa75d64 [FLINK-31181] Support LIKE operator pushdown aa75d64 is described below commit aa75d6476a381c4c80bb39364944891c514b8002 Author: Grzegorz Kołakowski AuthorDate: Wed Feb 22 09:44:11 2023 +0100 [FLINK-31181] Support LIKE operator pushdown --- .../jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java | 3 +++ .../jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java index f9622a8..a4e31ee 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java @@ -79,6 +79,9 @@ public class JdbcFilterPushdownPreparedStatementVisitor if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) { return renderBinaryOperator("AND", call.getResolvedChildren()); } +if (BuiltInFunctionDefinitions.LIKE.equals(call.getFunctionDefinition())) { +return renderBinaryOperator("LIKE", call.getResolvedChildren()); +} if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) { return renderUnaryOperator("IS NULL", call.getResolvedChildren().get(0), true); } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java index 8bdfc67..57a9e0d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java @@ -144,7 +144,8 @@ class JdbcFilterPushdownPreparedStatementVisitorTest { new Object[] {"real_col > 0.5", "real_col > ?", new BigDecimal("0.5")}, new Object[] { "double_col <= -0.3", "double_col <= ?", new BigDecimal("-0.3") -}) +}, +new Object[] {"description LIKE '_bcd%'", "description LIKE ?", "_bcd%"}) .forEach( inputs -> assertSimpleInputExprEqualsOutExpr(
[flink] branch master updated: [FLINK-31113][orc] Support AND filter push down in orc
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 5cda70d873c [FLINK-31113][orc] Support AND filter push down in orc 5cda70d873c is described below commit 5cda70d873c9630c898d765633ec7a6cfe53e3c6 Author: shammon AuthorDate: Tue Feb 21 17:18:59 2023 +0800 [FLINK-31113][orc] Support AND filter push down in orc This close #21956 --- .../main/java/org/apache/flink/orc/OrcFilters.java | 45 ++ .../apache/flink/orc/OrcFileSystemFilterTest.java | 11 ++ 2 files changed, 56 insertions(+) diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java index a33b79d18f9..4393356fc30 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFilters.java @@ -62,6 +62,7 @@ public class OrcFilters { OrcFilters::convertIsNotNull) .put(BuiltInFunctionDefinitions.NOT, OrcFilters::convertNot) .put(BuiltInFunctionDefinitions.OR, OrcFilters::convertOr) +.put(BuiltInFunctionDefinitions.AND, OrcFilters::convertAnd) .put( BuiltInFunctionDefinitions.EQUALS, call -> @@ -186,6 +187,22 @@ public class OrcFilters { } } +private static Predicate convertAnd(CallExpression callExp) { +if (callExp.getChildren().size() < 2) { +return null; +} +Expression left = callExp.getChildren().get(0); +Expression right = callExp.getChildren().get(1); + +Predicate c1 = toOrcPredicate(left); +Predicate c2 = toOrcPredicate(right); +if (c1 == null || c2 == null) { +return null; +} else { +return new And(c1, c2); +} +} + public static Predicate convertBinary( CallExpression callExp, TriFunction func, @@ -719,4 +736,32 @@ public class OrcFilters { return "OR(" + Arrays.toString(preds) + ")"; } } + +/** An AND predicate that can be evaluated by the OrcInputFormat. */ +public static class And extends Predicate { +private final Predicate[] preds; + +/** + * Creates an AND predicate. + * + * @param predicates The conjunctive predicates. + */ +public And(Predicate... predicates) { +this.preds = predicates; +} + +@Override +public SearchArgument.Builder add(SearchArgument.Builder builder) { +SearchArgument.Builder withAnd = builder.startAnd(); +for (Predicate pred : preds) { +withAnd = pred.add(withAnd); +} +return withAnd.end(); +} + +@Override +public String toString() { +return "AND(" + Arrays.toString(preds) + ")"; +} +} } diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java index 8137d78ac38..cdee400723d 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -74,5 +75,15 @@ class OrcFileSystemFilterTest { OrcFilters.Predicate predicate6 = new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 10); assertThat(predicate5).hasToString(predicate6.toString()); + +// and +CallExpression andExpression = +CallExpression.permanent( +BuiltInFunctionDefinitions.AND, +Arrays.asList(greaterExpression, lessExpression), +DataTypes.BOOLEAN()); +OrcFilters.Predicate predicate7 = OrcFilters.toOrcPredicate(andExpression); +OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, predicate6); +assertThat(predicate7).hasToString(predicate8.toString()); } }
[flink] branch master updated: [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7ea4476c054 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type 7ea4476c054 is described below commit 7ea4476c0544e17798cbb1e39609827954f6c266 Author: laughingman7743 AuthorDate: Wed Jan 4 23:12:19 2023 +0900 [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type Close #21613 --- .../docs/connectors/table/formats/protobuf.md | 5 ++ flink-formats/flink-protobuf/pom.xml | 1 + .../apache/flink/formats/protobuf/PbConstant.java | 1 + .../flink/formats/protobuf/PbFormatContext.java| 8 +-- .../deserialize/PbCodegenArrayDeserializer.java| 3 +- .../deserialize/PbCodegenMapDeserializer.java | 6 +- .../deserialize/PbCodegenRowDeserializer.java | 3 +- .../protobuf/deserialize/ProtoToRowConverter.java | 6 +- .../serialize/PbCodegenArraySerializer.java| 3 +- .../protobuf/serialize/PbCodegenMapSerializer.java | 6 +- .../protobuf/serialize/PbCodegenRowSerializer.java | 11 +--- .../serialize/PbCodegenSimpleSerializer.java | 8 +-- .../protobuf/serialize/RowToProtoConverter.java| 4 +- .../formats/protobuf/util/PbCodegenUtils.java | 19 +++ .../flink/formats/protobuf/util/PbFormatUtils.java | 58 +++- ...RowTest.java => MetaNoMultiProtoToRowTest.java} | 18 ++ .../formats/protobuf/MetaOuterNoMultiTest.java | 4 +- .../protobuf/SameOuterClassNameProtoToRowTest.java | 61 + .../protobuf/SameOuterClassNameRowToProtoTest.java | 56 +++ .../formats/protobuf/SimpleProtoToRowTest.java | 45 +-- .../formats/protobuf/SimpleRowToProtoTest.java | 49 ++--- .../protobuf/TimestampMultiProtoToRowTest.java | 46 .../protobuf/TimestampMultiRowToProtoTest.java}| 32 ++- .../protobuf/TimestampNoMultiProtoToRowTest.java | 47 .../protobuf/TimestampNoMultiRowToProtoTest.java | 44 +++ .../TimestampOuterMultiProtoToRowTest.java | 49 + .../TimestampOuterMultiRowToProtoTest.java | 44 +++ .../TimestampOuterNoMultiProtoToRowTest.java | 47 .../TimestampOuterNoMultiRowToProtoTest.java | 47 .../flink-protobuf/src/test/proto/test_map.proto | 18 +++--- .../test/proto/test_multiple_level_message.proto | 26 - .../flink-protobuf/src/test/proto/test_null.proto | 64 +++--- .../flink-protobuf/src/test/proto/test_oneof.proto | 8 +-- .../flink-protobuf/src/test/proto/test_pb3.proto | 48 .../src/test/proto/test_repeated.proto | 13 ++--- .../src/test/proto/test_repeated_message.proto | 12 ++-- ...neof.proto => test_same_outer_class_name.proto} | 16 +++--- .../{test_simple.proto => test_simple_multi.proto} | 51 + .../test/proto/test_simple_no_java_package.proto | 40 +++--- ...ter_nomulti.proto => test_simple_nomulti.proto} | 30 ++ .../src/test/proto/test_simple_outer_multi.proto | 23 .../src/test/proto/test_simple_outer_nomulti.proto | 26 - ...test_oneof.proto => test_timestamp_multi.proto} | 10 ++-- ...st_oneof.proto => test_timestamp_nomulti.proto} | 12 ++-- ...sage.proto => test_timestamp_outer_multi.proto} | 14 ++--- ...ge.proto => test_timestamp_outer_nomulti.proto} | 16 ++ 46 files changed, 806 insertions(+), 352 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md b/docs/content.zh/docs/connectors/table/formats/protobuf.md index 5cbafc8d911..b28cf49f9d3 100644 --- a/docs/content.zh/docs/connectors/table/formats/protobuf.md +++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md @@ -236,6 +236,11 @@ The following table lists the type mapping from Flink type to Protobuf type. enum The enum value of protobuf can be mapped to string or number of flink row accordingly. + + ROW<seconds BIGINT, nanos INT> + google.protobuf.timestamp + The google.protobuf.timestamp type can be mapped to seconds and fractions of seconds at nanosecond resolution in UTC epoch time using the row type as well as the protobuf definition. + diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml index be38a1a4242..da593b8bc4a 100644 --- a/flink-formats/flink-protobuf/pom.xml +++ b/flink-formats/flink-protobuf/pom.xml @@ -109,6 +109,7 @@ under the License. com.google.pr
[flink] branch master updated: [FLINK-29990][sql-parser] Fix unparsed sql for SqlTableLike with no options
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new dd768443424 [FLINK-29990][sql-parser] Fix unparsed sql for SqlTableLike with no options dd768443424 is described below commit dd76844342489a252f8b76417090f137028af0bc Author: chengshuo.cs AuthorDate: Fri Nov 11 15:21:24 2022 +0800 [FLINK-29990][sql-parser] Fix unparsed sql for SqlTableLike with no options --- .../org/apache/flink/sql/parser/ddl/SqlTableLike.java | 3 +++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 19 +++ 2 files changed, 22 insertions(+) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java index 634eb1b0b55..eb1651cc2e0 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java @@ -223,6 +223,9 @@ public class SqlTableLike extends SqlCall implements ExtendedSqlNode { public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("LIKE"); sourceTable.unparse(writer, leftPrec, rightPrec); +if (options == null || options.isEmpty()) { +return; +} SqlWriter.Frame frame = writer.startList("(", ")"); for (SqlTableLikeOption option : options) { writer.newlineAndIndent(); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 741afa6d010..2dd968993f0 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1078,6 +1078,25 @@ class FlinkSqlParserImplTest extends SqlParserTest { sql(sql).fails("(?s).*Encountered \"a\" at line 6, column 3.\n.*"); } +@Test +void testCreateTableLikeWithoutOption() { +final String sql = +"create table source_table(\n" ++ " a int,\n" ++ " b bigint,\n" ++ " c string\n" ++ ")\n" ++ "LIKE parent_table"; +final String expected = +"CREATE TABLE `SOURCE_TABLE` (\n" ++ " `A` INTEGER,\n" ++ " `B` BIGINT,\n" ++ " `C` STRING\n" ++ ")\n" ++ "LIKE `PARENT_TABLE`"; +sql(sql).ok(expected); +} + @Test void testCreateTableWithLikeClause() { final String sql =
[flink] branch master updated: [FLINK-16024][connector/jdbc] Support filter pushdown
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 496700197b9 [FLINK-16024][connector/jdbc] Support filter pushdown 496700197b9 is described below commit 496700197b9e2ca60f6cdd3eb01fa4a12227548a Author: Qing Lim AuthorDate: Wed Jun 22 12:55:20 2022 +0100 [FLINK-16024][connector/jdbc] Support filter pushdown This closes #20140 --- .../CompositeJdbcParameterValuesProvider.java | 58 .../jdbc/table/JdbcDynamicTableSource.java | 113 +++- ...JdbcFilterPushdownPreparedStatementVisitor.java | 201 ++ .../jdbc/table/ParameterizedPredicate.java | 59 .../jdbc/table/JdbcDynamicTableSourceITCase.java | 140 ++ ...FilterPushdownPreparedStatementVisitorTest.java | 297 + .../connector/jdbc/table/JdbcTablePlanTest.java| 6 + .../connector/jdbc/table/JdbcTablePlanTest.xml | 17 ++ 8 files changed, 880 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java new file mode 100644 index 000..eebeac5abe0 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/CompositeJdbcParameterValuesProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** Combine 2 {@link JdbcParameterValuesProvider} into 1. */ +@Internal +public class CompositeJdbcParameterValuesProvider implements JdbcParameterValuesProvider { +JdbcParameterValuesProvider a; +JdbcParameterValuesProvider b; + +public CompositeJdbcParameterValuesProvider( +JdbcParameterValuesProvider a, JdbcParameterValuesProvider b) { +Preconditions.checkArgument( +a.getParameterValues().length == b.getParameterValues().length, +"Both JdbcParameterValuesProvider should have the same length."); +this.a = a; +this.b = b; +} + +@Override +public Serializable[][] getParameterValues() { +int batchNum = this.a.getParameterValues().length; +Serializable[][] parameters = new Serializable[batchNum][]; +for (int i = 0; i < batchNum; i++) { +Serializable[] aSlice = a.getParameterValues()[i]; +Serializable[] bSlice = b.getParameterValues()[i]; +int totalLen = aSlice.length + bSlice.length; + +Serializable[] batchParams = new Serializable[totalLen]; + +System.arraycopy(aSlice, 0, batchParams, 0, aSlice.length); +System.arraycopy(bSlice, 0, batchParams, aSlice.length, bSlice.length); +parameters[i] = batchParams; +} +return parameters; +} +} diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index 0bc7ae04792..e27cf63df50 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -22,25 +22,41 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; +import org.apache.flink.connector.jdbc.split.CompositeJdb
[flink] branch master updated: [FLINK-28765][docs] Add documentation for protobuf format
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 92ec61a54df [FLINK-28765][docs] Add documentation for protobuf format 92ec61a54df is described below commit 92ec61a54df2a66183633c57944eac774dd48906 Author: Suhan Mao AuthorDate: Mon Aug 1 18:37:35 2022 +0800 [FLINK-28765][docs] Add documentation for protobuf format This closes #20408 --- .../docs/connectors/table/formats/protobuf.md | 286 + .../docs/connectors/table/formats/protobuf.md | 286 + docs/data/sql_connectors.yml | 6 + 3 files changed, 578 insertions(+) diff --git a/docs/content.zh/docs/connectors/table/formats/protobuf.md b/docs/content.zh/docs/connectors/table/formats/protobuf.md new file mode 100644 index 000..5cbafc8d911 --- /dev/null +++ b/docs/content.zh/docs/connectors/table/formats/protobuf.md @@ -0,0 +1,286 @@ +--- +title: Protobuf +weight: 4 +type: docs +aliases: +- /dev/table/connectors/formats/protobuf.html +--- + + +# Protobuf Format + +{{< label "Format: Serialization Schema" >}} +{{< label "Format: Deserialization Schema" >}} + +The Protocol Buffers [Protobuf](https://developers.google.com/protocol-buffers) format allows you to read and write Protobuf data, based on Protobuf generated classes. + +Dependencies + + +{{< sql_download_table "protobuf" >}} + +How to create a table with Protobuf format + + +Here is an example to create a table using the Kafka connector and Protobuf format. + +Below is the proto definition file. + +``` +syntax = "proto2"; +package com.example; +option java_package = "com.example"; +option java_multiple_files = true; + +message SimpleTest { +optional int64 uid = 1; +optional string name = 2; +optional int32 category_type = 3; +optional bytes content = 4; +optional double price = 5; +map value_map = 6; +repeated InnerMessageTest value_arr = 7; +optional Corpus corpus_int = 8; +optional Corpus corpus_str = 9; + +message InnerMessageTest{ + optional int64 v1 =1; + optional int32 v2 =2; +} + +enum Corpus { +UNIVERSAL = 0; +WEB = 1; +IMAGES = 2; +LOCAL = 3; +NEWS = 4; +PRODUCTS = 5; +VIDEO = 7; + } +} +``` + +1. Use [`protoc`](https://developers.google.com/protocol-buffers/docs/javatutorial#compiling-your-protocol-buffers) command to compile the `.proto` file to java classes +2. Then compile and package the classes (there is no need to package proto-java into the jar) +3. Finally you should provide the `jar` in your classpath, e.g. pass it using `-j` in }}">sql-client + +```sql +CREATE TABLE simple_test ( + uid BIGINT, + name STRING, + category_type INT, + content BINARY, + price DOUBLE, + value_map map>, + value_arr array>, + corpus_int INT, + corpus_str STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'user_behavior', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'protobuf', + 'protobuf.message-class-name' = 'com.example.SimpleTest', + 'protobuf.ignore-parse-errors' = 'true' +) +``` + +Format Options + + + + + +Option +Required +Forwarded +Default +Type +Description + + + + + format + required + no + (none) + String + Specify what format to use, here should be 'protobuf'. + + + protobuf.message-class-name + required + no + (none) + String + The full name of a Protobuf generated class. The name must match the message name in the proto definition file. $ is supported for inner class names, like 'com.exmample.OuterClass$MessageClass' + + + protobuf.ignore-parse-errors + optional + no + false + Boolean + Optional flag to skip rows with parse errors instead of failing. + + + protobuf.read-default-values + optional + yes + false + Boolean + + This option only works if the generated class's version is proto2. If this value is set to true, the format will read empty values as the default values defined in the proto file. + If the value is set to false, the format will generate null values if the data element does not exist in the binary protobuf message. + If the proto syntax is proto3, this value will forcibly be set to true, because proto3's standard i
[flink] branch master updated (be4e0fe050b -> 5c87b69b530)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from be4e0fe050b [FLINK-28577][web] Fix the null error of reading checkpointed_size in checkpoint tab add 5c87b69b530 [FLINK-18202][protobuf] Introduce protobuf format No new revisions were added by this update. Summary of changes: flink-formats/flink-protobuf/pom.xml | 146 + .../com/google/protobuf/ProtobufInternalUtils.java | 27 ++ .../flink/formats/protobuf/PbCodegenException.java | 36 +++ .../apache/flink/formats/protobuf/PbConstant.java | 29 ++ .../flink/formats/protobuf/PbDecodingFormat.java | 52 .../flink/formats/protobuf/PbEncodingFormat.java | 49 +++ .../flink/formats/protobuf/PbFormatConfig.java | 120 .../flink/formats/protobuf/PbFormatContext.java| 38 +++ .../flink/formats/protobuf/PbFormatFactory.java| 94 ++ .../flink/formats/protobuf/PbFormatOptions.java| 55 .../deserialize/PbCodegenArrayDeserializer.java| 87 ++ .../deserialize/PbCodegenDeserializeFactory.java | 56 .../deserialize/PbCodegenDeserializer.java | 39 +++ .../deserialize/PbCodegenMapDeserializer.java | 115 +++ .../deserialize/PbCodegenRowDeserializer.java | 126 .../deserialize/PbCodegenSimpleDeserializer.java | 86 ++ .../PbRowDataDeserializationSchema.java| 107 +++ .../protobuf/deserialize/ProtoToRowConverter.java | 134 + .../serialize/PbCodegenArraySerializer.java| 89 ++ .../protobuf/serialize/PbCodegenMapSerializer.java | 126 .../protobuf/serialize/PbCodegenRowSerializer.java | 128 .../serialize/PbCodegenSerializeFactory.java | 54 .../protobuf/serialize/PbCodegenSerializer.java| 40 +++ .../serialize/PbCodegenSimpleSerializer.java | 121 .../serialize/PbRowDataSerializationSchema.java| 66 .../protobuf/serialize/RowToProtoConverter.java| 114 +++ .../formats/protobuf/util/PbCodegenAppender.java | 85 ++ .../formats/protobuf/util/PbCodegenUtils.java | 270 + .../formats/protobuf/util/PbCodegenVarId.java | 40 +++ .../flink/formats/protobuf/util/PbFormatUtils.java | 105 +++ .../protobuf/util/PbSchemaValidationUtils.java | 167 +++ .../formats/protobuf/util/PbToRowTypeUtil.java | 110 +++ .../org.apache.flink.table.factories.Factory | 15 + .../flink/formats/protobuf/MapProtoToRowTest.java | 64 .../flink/formats/protobuf/MapRowToProtoTest.java | 71 + .../protobuf/MetaNoOuterNoMultiProtoToRowTest.java | 77 + .../flink/formats/protobuf/MetaOuterMultiTest.java | 56 .../formats/protobuf/MetaOuterNoMultiTest.java | 61 .../protobuf/MultiLevelMessageProtoToRowTest.java | 58 .../protobuf/MultiLevelMessageRowToProtoTest.java | 58 .../protobuf/NoJavaPackageProtoToRowTest.java | 32 ++ .../formats/protobuf/NullValueToProtoTest.java | 219 ++ .../formats/protobuf/OneofProtoToRowTest.java | 38 +++ .../formats/protobuf/OneofRowToProtoTest.java | 41 +++ .../flink/formats/protobuf/Pb3ToRowTest.java | 125 .../formats/protobuf/ProtobufSQLITCaseTest.java| 331 + .../flink/formats/protobuf/ProtobufTestHelper.java | 132 .../protobuf/RepeatedMessageProtoToRowTest.java| 57 .../protobuf/RepeatedMessageRowToProtoTest.java| 50 .../formats/protobuf/RepeatedProtoToRowTest.java | 43 +++ .../formats/protobuf/RepeatedRowToProtoTest.java | 75 + .../formats/protobuf/SimpleProtoToRowTest.java | 117 .../formats/protobuf/SimpleRowToProtoTest.java | 96 ++ .../protobuf/table/TestProtobufSinkFunction.java | 45 +++ .../protobuf/table/TestProtobufSourceFunction.java | 52 .../protobuf/table/TestProtobufTableFactory.java | 84 ++ .../protobuf/table/TestProtobufTableSink.java | 61 .../protobuf/table/TestProtobufTableSource.java| 64 .../protobuf/table/TestProtobufTestStore.java | 28 ++ .../flink-protobuf/src/test/proto/test_map.proto | 37 +++ .../test/proto/test_multiple_level_message.proto | 42 +++ .../flink-protobuf/src/test/proto/test_null.proto | 61 .../flink-protobuf/src/test/proto/test_oneof.proto | 29 ++ .../flink-protobuf/src/test/proto/test_pb3.proto | 53 .../src/test/proto/test_repeated.proto | 32 ++ .../src/test/proto/test_repeated_message.proto | 32 ++ .../src/test/proto/test_simple.proto | 49 +++ .../test/proto/test_simple_no_java_package.proto | 46 +++ .../test/proto/test_simple_noouter_nomulti.proto | 38 +++ .../src/test/proto/test_simple_outer_multi.proto | 40 +++ .../src/test/proto/test_simple_outer_nomulti.proto | 39
[flink-web] branch asf-site updated: Add libenchao to the committer list
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 9f3e4ee Add libenchao to the committer list 9f3e4ee is described below commit 9f3e4ee2506e73f2551adb0fba61d5cc70817d5e Author: Benchao Li AuthorDate: Wed Jan 26 17:56:32 2022 +0800 Add libenchao to the committer list --- community.md| 7 ++- community.zh.md | 6 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/community.md b/community.md index af5af0a..d8f0b87 100644 --- a/community.md +++ b/community.md @@ -562,7 +562,12 @@ Flink Forward is a conference happening yearly in different locations around the PMC, Committer igalshilman - + +https://avatars1.githubusercontent.com/u/4471524?s=50"; class="committer-avatar"> +Benchao Li +Committer +libenchao + You can reach committers directly at `@apache.org`. A list of all contributors can be found [here]({{ site.FLINK_CONTRIBUTORS_URL }}). diff --git a/community.zh.md b/community.zh.md index e563079..cefec26 100644 --- a/community.zh.md +++ b/community.zh.md @@ -547,6 +547,12 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最 PMC, Committer zhuzh + +https://avatars1.githubusercontent.com/u/4471524?s=50"; class="committer-avatar"> +Benchao Li +Committer +libenchao + 可以通过 `@apache.org` 直接联系 committer。可以在 [这里]({{ site.FLINK_CONTRIBUTORS_URL }}) 找到所有的贡献者。
[flink] branch release-1.11 updated: [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new b50fc62 [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters b50fc62 is described below commit b50fc62e5b08363932815e0c68fbea3f08f0011f Author: Benchao Li AuthorDate: Wed Nov 4 21:33:23 2020 +0800 [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters This closes 13926 --- .../json/JsonRowDataSerializationSchema.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index 89b3b87..afc8ede 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -268,6 +268,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema
[flink] branch release-1.11 updated (f488e70 -> b50fc62)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from f488e70 [FLINK-19867][table-common] Validation fails for UDF that accepts var-args add b50fc62 [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters No new revisions were added by this update. Summary of changes: .../json/JsonRowDataSerializationSchema.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated (cddb821 -> b7487bd)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from cddb821 [FLINK-19844][python][docs] Add documentation for Python UDAF add b7487bd [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters No new revisions were added by this update. Summary of changes: .../formats/json/RowDataToJsonConverters.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated (cddb821 -> b7487bd)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from cddb821 [FLINK-19844][python][docs] Add documentation for Python UDAF add b7487bd [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters No new revisions were added by this update. Summary of changes: .../formats/json/RowDataToJsonConverters.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated (cddb821 -> b7487bd)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from cddb821 [FLINK-19844][python][docs] Add documentation for Python UDAF add b7487bd [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters No new revisions were added by this update. Summary of changes: .../formats/json/RowDataToJsonConverters.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated (cddb821 -> b7487bd)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from cddb821 [FLINK-19844][python][docs] Add documentation for Python UDAF add b7487bd [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters No new revisions were added by this update. Summary of changes: .../formats/json/RowDataToJsonConverters.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-)
[flink] branch master updated: [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b7487bd [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters b7487bd is described below commit b7487bd85bedea8bd50e706c253a61ebbcf8a8bc Author: Benchao Li AuthorDate: Fri Oct 23 23:03:44 2020 +0800 [FLINK-19790][json] Clear reused ObjectNode's content for map converter in RowDataToJsonConverters This closes #13777 --- .../formats/json/RowDataToJsonConverters.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 23 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java index e7530f1..132efc5 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java @@ -242,6 +242,7 @@ public class RowDataToJsonConverters implements Serializable { node = mapper.createObjectNode(); } else { node = (ObjectNode) reuse; + node.removeAll(); } MapData map = (MapData) object; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index e2a4762..bba9cc1 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -246,7 +246,12 @@ public class JsonRowDataSerDeSchemaTest { RowType rowType = (RowType) ROW( FIELD("f1", INT()), FIELD("f2", BOOLEAN()), - FIELD("f3", STRING()) + FIELD("f3", STRING()), + FIELD("f4", MAP(STRING(), STRING())), + FIELD("f5", ARRAY(STRING())), + FIELD("f6", ROW( + FIELD("f1", STRING()), + FIELD("f2", INT( ).getLogicalType(); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( @@ -261,6 +266,14 @@ public class JsonRowDataSerDeSchemaTest { root.put("f1", 1); root.put("f2", true); root.put("f3", "str"); + ObjectNode map = root.putObject("f4"); + map.put("hello1", "flink"); + ArrayNode array = root.putArray("f5"); + array.add("element1"); + array.add("element2"); + ObjectNode row = root.putObject("f6"); + row.put("f1", "this is row1"); + row.put("f2", 12); byte[] serializedJson = objectMapper.writeValueAsBytes(root); RowData rowData = deserializationSchema.deserialize(serializedJson); byte[] actual = serializationSchema.serialize(rowData); @@ -273,6 +286,14 @@ public class JsonRowDataSerDeSchemaTest { root.put("f1", 10); root.put("f2", false); root.put("f3", "newStr"); + ObjectNode map = root.putObject("f4"); + map.put("hello2", "json"); + ArrayNode array = root.putArray("f5"); + array.add("element3"); + array.add("element4"); + ObjectNode row = root.putObject("f6"); + row.put("f1", "this is row2"); + row.putNull("f2"); byte[] serializedJson = objectMapper.writeValueAsBytes(root); RowData rowData = deserializationSchema.deserialize(serializedJson); byte[] actual = serializationSchema.serialize(rowData);
[flink] branch master updated (a8c7b50 -> b5b3065)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a8c7b50 [FLINK-19590] Fix the compile issue of flink-streaming-java module add 29a5135 [FLINK-19336][table] EncodingUtils#encodeObjectToString should propagate inner exception add b5b3065 [FLINK-18850][table-runtime-blink] Add late records dropped metric for row time over windows No new revisions were added by this update. Summary of changes: .../apache/flink/table/utils/EncodingUtils.java| 2 +- .../AbstractRowTimeUnboundedPrecedingOver.java | 20 ++- .../over/RowTimeRangeBoundedPrecedingFunction.java | 18 +++ .../over/RowTimeRowsBoundedPrecedingFunction.java | 18 +++ ...ionTest.java => RowTimeOverWindowTestBase.java} | 57 .../RowTimeRangeBoundedPrecedingFunctionTest.java | 55 +-- ...RowTimeRangeUnboundedPrecedingFunctionTest.java | 61 + .../RowTimeRowsBoundedPrecedingFunctionTest.java | 62 ++ .../RowTimeRowsUnboundedPrecedingFunctionTest.java | 61 + 9 files changed, 275 insertions(+), 79 deletions(-) copy flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/{ProcTimeRangeBoundedPrecedingFunctionTest.java => RowTimeOverWindowTestBase.java} (50%) create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeUnboundedPrecedingFunctionTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunctionTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsUnboundedPrecedingFunctionTest.java
[flink] branch master updated (a8c7b50 -> b5b3065)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a8c7b50 [FLINK-19590] Fix the compile issue of flink-streaming-java module add 29a5135 [FLINK-19336][table] EncodingUtils#encodeObjectToString should propagate inner exception add b5b3065 [FLINK-18850][table-runtime-blink] Add late records dropped metric for row time over windows No new revisions were added by this update. Summary of changes: .../apache/flink/table/utils/EncodingUtils.java| 2 +- .../AbstractRowTimeUnboundedPrecedingOver.java | 20 ++- .../over/RowTimeRangeBoundedPrecedingFunction.java | 18 +++ .../over/RowTimeRowsBoundedPrecedingFunction.java | 18 +++ ...ionTest.java => RowTimeOverWindowTestBase.java} | 57 .../RowTimeRangeBoundedPrecedingFunctionTest.java | 55 +-- ...RowTimeRangeUnboundedPrecedingFunctionTest.java | 61 + .../RowTimeRowsBoundedPrecedingFunctionTest.java | 62 ++ .../RowTimeRowsUnboundedPrecedingFunctionTest.java | 61 + 9 files changed, 275 insertions(+), 79 deletions(-) copy flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/{ProcTimeRangeBoundedPrecedingFunctionTest.java => RowTimeOverWindowTestBase.java} (50%) create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRangeUnboundedPrecedingFunctionTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunctionTest.java create mode 100644 flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsUnboundedPrecedingFunctionTest.java
[flink] branch master updated (798c004 -> 81598d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 798c004 [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port add 81598d4 [hotfix][table-planner-blink] Improve exception of item operator No new revisions were added by this update. Summary of changes: .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (798c004 -> 81598d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 798c004 [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port add 81598d4 [hotfix][table-planner-blink] Improve exception of item operator No new revisions were added by this update. Summary of changes: .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (798c004 -> 81598d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 798c004 [FLINK-16789][coordination][rest] Expose TaskExecutor JMX port add 81598d4 [hotfix][table-planner-blink] Improve exception of item operator No new revisions were added by this update. Summary of changes: .../org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch release-1.11 updated: [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 383991a [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path 383991a is described below commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1 Author: zhushang AuthorDate: Sun Sep 20 22:46:14 2020 +0800 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path This closes #13431 --- .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 2530897..cf8f62b 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -140,8 +140,7 @@ class SqlCreateTableConverter { } private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) { - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable() - .toString()); + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); CatalogManager.TableLookupResult lookupResult = catalogManager.getTable(identifier) .orElseThrow(() -> new ValidationException(String.format( diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 1c582a3..2d5faad 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -529,6 +529,39 @@ public class SqlToOperationConverterTest { } @Test + public void testCreateTableLikeWithFullPath(){ + Map sourceProperties = new HashMap<>(); + sourceProperties.put("connector.type", "kafka"); + sourceProperties.put("format.type", "json"); + CatalogTableImpl catalogTable = new CatalogTableImpl( + TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.TIMESTAMP(3)) + .build(), + sourceProperties, + null + ); + catalogManager.createTable(catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); + final String sql = "create table mytable like `builtin`.`default`.sourceTable"; + Operation operation = parseAndConvert(sql); + + assertThat( + operation, + isCreateTableOperation( + withSchema( + TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.TIMESTAMP(3)) + .build() + ), + withOptions( + entry("connector.type", "kafka"), + entry("format.type", "json") + ) + )); + } + + @Test public void testMergingCreateTableLike() { Map sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json");
[flink] branch master updated (ac776cb -> d0226d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ac776cb [FLINK-17910][e2e] Fix debug log output to investigate rare test failure add d0226d4 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path No new revisions were added by this update. Summary of changes: .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-)
[flink] branch master updated (ac776cb -> d0226d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ac776cb [FLINK-17910][e2e] Fix debug log output to investigate rare test failure add d0226d4 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path No new revisions were added by this update. Summary of changes: .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-)
[flink] branch master updated (ac776cb -> d0226d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ac776cb [FLINK-17910][e2e] Fix debug log output to investigate rare test failure add d0226d4 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path No new revisions were added by this update. Summary of changes: .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-)
[flink] branch master updated (ac776cb -> d0226d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ac776cb [FLINK-17910][e2e] Fix debug log output to investigate rare test failure add d0226d4 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path No new revisions were added by this update. Summary of changes: .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-)
[flink] branch master updated (ac776cb -> d0226d4)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ac776cb [FLINK-17910][e2e] Fix debug log output to investigate rare test failure add d0226d4 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path No new revisions were added by this update. Summary of changes: .../operations/SqlCreateTableConverter.java| 3 +- .../operations/SqlToOperationConverterTest.java| 33 ++ 2 files changed, 34 insertions(+), 2 deletions(-)
[flink] branch master updated (378115f -> b5a459c)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 378115f [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception add b5a459c [FLINK-19050][Documentation] Doc of MAX_DECIMAL_PRECISION should be DECIMAL No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/connector/jdbc/dialect/PostgresDialect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (378115f -> b5a459c)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 378115f [FLINK-19012][task] Check state of AsyncCheckpointRunnable before throwing an exception add b5a459c [FLINK-19050][Documentation] Doc of MAX_DECIMAL_PRECISION should be DECIMAL No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/connector/jdbc/dialect/PostgresDialect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch release-1.11 updated (4efb3b7 -> 076a474)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 4efb3b7 [FLINK-18666][build] Update japicmp configuration for 1.11.1 add 076a474 [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule. No new revisions were added by this update. Summary of changes: .../stream/StreamExecTemporalSortRule.scala| 15 ++-- .../table/planner/plan/stream/sql/SortTest.xml | 6 ++- .../runtime/stream/sql/TemporalSortITCase.scala| 43 ++ 3 files changed, 58 insertions(+), 6 deletions(-)
[flink] branch release-1.11 updated: [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 39b4778 [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null 39b4778 is described below commit 39b4778a3eaae81902203d558335ed4fbbd8e379 Author: libenchao AuthorDate: Thu Feb 20 22:19:17 2020 +0800 [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null This closes #11161 --- .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +--- .../table/planner/expressions/ScalarOperatorsTest.scala | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala index 532f8fe..fcc62c3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.codegen.calls -import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType +import org.apache.flink.table.planner.codegen.CodeGenUtils.{primitiveDefaultValue, primitiveTypeTermForType} import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.types.logical.LogicalType @@ -44,6 +44,7 @@ class IfCallGen() extends CallGenerator { } val resultTypeTerm = primitiveTypeTermForType(returnType) +val resultDefault = primitiveDefaultValue(returnType) val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables( (resultTypeTerm, "result"), ("boolean", "isNull")) @@ -51,13 +52,18 @@ class IfCallGen() extends CallGenerator { val resultCode = s""" |${operands.head.code} + |$resultTerm = $resultDefault; |if (${operands.head.resultTerm}) { | ${operands(1).code} - | $resultTerm = $castedResultTerm1; + | if (!${operands(1).nullTerm}) { + |$resultTerm = $castedResultTerm1; + | } | $nullTerm = ${operands(1).nullTerm}; |} else { | ${operands(2).code} - | $resultTerm = $castedResultTerm2; + | if (!${operands(2).nullTerm}) { + |$resultTerm = $castedResultTerm2; + | } | $nullTerm = ${operands(2).nullTerm}; |} """.stripMargin diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala index 04b2002..4f6cbe4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala @@ -124,5 +124,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", "true") testSqlApi("CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END", "null") +testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") } }
[flink] branch master updated (07e43c9 -> d677f02)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 07e43c9 [hotfix][docs] Add 1.11 to the list of previous docs add d677f02 [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null No new revisions were added by this update. Summary of changes: .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +--- .../table/planner/expressions/ScalarOperatorsTest.scala | 1 + 2 files changed, 10 insertions(+), 3 deletions(-)
[flink] branch master updated: [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d677f02 [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null d677f02 is described below commit d677f02853ed7395730c6282cd33bb0f5bbf4bd3 Author: libenchao AuthorDate: Thu Feb 20 22:19:17 2020 +0800 [FLINK-16181][table-planner-blink] Fix IfCallGen throw NPE when operand's resultTerm is null This closes #11161 --- .../apache/flink/table/planner/codegen/calls/IfCallGen.scala | 12 +--- .../table/planner/expressions/ScalarOperatorsTest.scala | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala index 532f8fe..fcc62c3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/IfCallGen.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.codegen.calls -import org.apache.flink.table.planner.codegen.CodeGenUtils.primitiveTypeTermForType +import org.apache.flink.table.planner.codegen.CodeGenUtils.{primitiveDefaultValue, primitiveTypeTermForType} import org.apache.flink.table.planner.codegen.{CodeGenUtils, CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.types.logical.LogicalType @@ -44,6 +44,7 @@ class IfCallGen() extends CallGenerator { } val resultTypeTerm = primitiveTypeTermForType(returnType) +val resultDefault = primitiveDefaultValue(returnType) val Seq(resultTerm, nullTerm) = ctx.addReusableLocalVariables( (resultTypeTerm, "result"), ("boolean", "isNull")) @@ -51,13 +52,18 @@ class IfCallGen() extends CallGenerator { val resultCode = s""" |${operands.head.code} + |$resultTerm = $resultDefault; |if (${operands.head.resultTerm}) { | ${operands(1).code} - | $resultTerm = $castedResultTerm1; + | if (!${operands(1).nullTerm}) { + |$resultTerm = $castedResultTerm1; + | } | $nullTerm = ${operands(1).nullTerm}; |} else { | ${operands(2).code} - | $resultTerm = $castedResultTerm2; + | if (!${operands(2).nullTerm}) { + |$resultTerm = $castedResultTerm2; + | } | $nullTerm = ${operands(2).nullTerm}; |} """.stripMargin diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala index 04b2002..4f6cbe4 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala @@ -124,5 +124,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", "true") testSqlApi("CASE WHEN f2 = 1 THEN CAST('' as INT) ELSE 0 END", "null") +testSqlApi("IF(true, CAST('non-numeric' AS BIGINT), 0)", "null") } }
[flink] 01/02: [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3865f7b23c656c74b85b0eb5dd8b73a0f5f88b03 Author: libenchao AuthorDate: Mon Jun 1 11:58:11 2020 +0800 [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type This closes #12421 --- .../json/JsonRowDataDeserializationSchema.java | 6 +- .../formats/json/JsonRowDeserializationSchema.java | 10 +- .../formats/json/JsonRowDataSerDeSchemaTest.java | 107 ++--- .../json/JsonRowDeserializationSchemaTest.java | 30 ++ 4 files changed, 114 insertions(+), 39 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index d66ecce..956ca5b 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -304,7 +304,11 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema } else if (simpleTypeInfo == Types.BOOLEAN) { return Optional.of(this::convertToBoolean); } else if (simpleTypeInfo == Types.STRING) { - return Optional.of((mapper, jsonNode) -> jsonNode.asText()); + return Optional.of(this::convertToString); } else if (simpleTypeInfo == Types.INT) { return Optional.of(this::convertToInt); } else if (simpleTypeInfo == Types.LONG) { @@ -381,6 +381,14 @@ public class JsonRowDeserializationSchema implements DeserializationSchema } } + private String convertToString(ObjectMapper mapper, JsonNode jsonNode) { + if (jsonNode.isContainerNode()) { + return jsonNode.toString(); + } else { + return jsonNode.asText(); + } + } + private boolean convertToBoolean(ObjectMapper mapper, JsonNode jsonNode) { if (jsonNode.isBoolean()) { // avoid redundant toString and parseBoolean, for better performance diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index cab427f..7b561aa 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -30,11 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.junit.Rule; +import org.junit.Assert; import org.junit.Test; -import org.junit.rules.ExpectedException; -import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDate; @@ -71,9 +69,6 @@ import static org.junit.Assert.assertEquals; */ public class JsonRowDataSerDeSchemaTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test public void testSerDe() throws Exception { byte tinyint = 'c'; @@ -326,9 +321,13 @@ public class JsonRowDataSerDeSchemaTest { deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema( schema, WrapperTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601); - thrown.expect(IOException.class); - thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'"); - deserializationSchema.deserialize(serializedJson); + String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'."; + try { + deserializationSchema.deserialize(serializedJson); + Assert.fail("expecting exception message: " + errorMessage); + } catch (Throwable t) { + assertEquals(errorMessage, t.getMessage()); + } // ignore on parse error deserializationSchema = new JsonRowDataDeserializationSchema( @@ -336,12 +335,15 @@ public class JsonRowDataSerDeSchemaTest { actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataTyp
[flink] branch master updated (f81f3a0 -> 66353f2)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f81f3a0 [FLINK-18477][examples-table] Fix packaging of ChangelogSocketExample new 3865f7b [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type new 66353f2 [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../json/JsonRowDataDeserializationSchema.java | 6 +- .../formats/json/JsonRowDeserializationSchema.java | 10 +- .../formats/json/JsonRowDataSerDeSchemaTest.java | 107 ++--- .../json/JsonRowDeserializationSchemaTest.java | 30 ++ .../stream/StreamExecTemporalSortRule.scala| 15 ++- .../table/planner/plan/stream/sql/SortTest.xml | 6 +- .../runtime/stream/sql/TemporalSortITCase.scala| 43 + 7 files changed, 172 insertions(+), 45 deletions(-)
[flink] 02/02: [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 66353f27c4c6481443d1f04a8f23e7f98dd7beda Author: libenchao AuthorDate: Mon Apr 6 16:22:28 2020 +0800 [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule. This closes #11643 --- .../stream/StreamExecTemporalSortRule.scala| 15 ++-- .../table/planner/plan/stream/sql/SortTest.xml | 6 ++- .../runtime/stream/sql/TemporalSortITCase.scala| 43 ++ 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala index cf17cc5..3490526 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala @@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort +import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule @@ -46,12 +47,18 @@ class StreamExecTemporalSortRule override def convert(rel: RelNode): RelNode = { val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort] val input = sort.getInput() -val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) -val convInput: RelNode = RelOptRule.convert(input, FlinkConventions.STREAM_PHYSICAL) +val requiredTraitSet = input.getTraitSet + .replace(FlinkRelDistribution.SINGLETON) + .replace(FlinkConventions.STREAM_PHYSICAL) +val providedTraitSet = sort.getTraitSet + .replace(FlinkRelDistribution.SINGLETON) + .replace(FlinkConventions.STREAM_PHYSICAL) + +val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet) new StreamExecTemporalSort( rel.getCluster, - traitSet, + providedTraitSet, convInput, sort.collation) } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml index af6e441..a1d191e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml @@ -32,7 +32,8 @@ LogicalProject(a=[$0]) @@ -136,7 +137,8 @@ LogicalProject(a=[$0]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala index de90d7e..ed90898 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala @@ -79,6 +79,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB } @Test + def testEventTimeOrderByWithParallelInput(): Unit = { +val data = List( + (3L, 2L, "Hello world", 3), + (2L, 2L, "Hello", 2), + (6L, 3L, "Luke Skywalker", 6), + (5L, 3L, "I am fine.", 5), + (7L, 4L, "Comment#1", 7), + (9L, 4L, "Comment#3", 9), + (10L, 4L, "Comment#4", 10), + (8L, 4L, "Comment#2", 8), + (1L, 1L, "Hi", 1), + (4L, 3L, "Helloworld, how are you?", 4)) + +val t = failingDataSource(data) + .assignTimestampsAndWatermarks( +new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L)) + .setParallelism(env.getParallelism) + .toTable(tEn
[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated data type into Chinese
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new f7cd463 [FLINK-18324][docs-zh] Translate updated data type into Chinese f7cd463 is described below commit f7cd46361ec705b8913950b2980c6a24efa8682c Author: Yubin Li AuthorDate: Sun Jun 28 15:32:38 2020 +0800 [FLINK-18324][docs-zh] Translate updated data type into Chinese This closes #12748 --- docs/dev/table/types.zh.md | 330 - 1 file changed, 266 insertions(+), 64 deletions(-) diff --git a/docs/dev/table/types.zh.md b/docs/dev/table/types.zh.md index 252ff02..819abc8 100644 --- a/docs/dev/table/types.zh.md +++ b/docs/dev/table/types.zh.md @@ -28,7 +28,7 @@ under the License. 从 Flink 1.9 开始,Table & SQL API 开始启用一种新的类型系统作为长期解决方案,用来保持 API 稳定性和 SQL 标准的兼容性。 -重新设计类型系统是一项涉及几乎所有的面向用户接口的重大工作。因此,它的引入跨越多个版本,社区的目标是在 Flink 1.10 完成这项工作。 +重新设计类型系统是一项涉及几乎所有的面向用户接口的重大工作。因此,它的引入跨越多个版本,社区的目标是在 Flink 1.12 完成这项工作。 同时由于为 Table 编程添加了新的 Planner 详见([FLINK-11439](https://issues.apache.org/jira/browse/FLINK-11439)), 并不是每种 Planner 都支持所有的数据类型。此外,Planner 对于数据类型的精度和参数化支持也可能是不完整的。 @@ -112,7 +112,7 @@ DataType t = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(int[].class); // 而是使用 java.sql.Timestamp val t: DataType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[java.sql.Timestamp]); -// 告诉运行时不要产生或者消费装箱的整数数组 +// 告诉运行时不要产生或者消费装箱的整数数组 // 而是使用基本数据类型的整数数组 val t: DataType = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(classOf[Array[Int]]); {% endhighlight %} @@ -184,23 +184,22 @@ Flink 1.9 之前引入的旧的 Planner 主要支持类型信息(Type Informat | `DOUBLE` | | | `DATE` | | | `TIME` | 支持的精度仅为 `0`。 | -| `TIMESTAMP` | 支持的精度仅为 `3`。 | -| `TIMESTAMP WITH LOCAL TIME ZONE` | 支持的精度仅为 `3`。 | +| `TIMESTAMP` | | +| `TIMESTAMP WITH LOCAL TIME ZONE` | | | `INTERVAL` | 仅支持 `MONTH` 和 `SECOND(3)` 区间。 | | `ARRAY` | | | `MULTISET` | | | `MAP` | | | `ROW` | | | `RAW` | | +| stuctured types | 暂只能在用户自定义函数里使用。 | 局限性 --- **Java 表达式字符串**:Table API 中的 Java 表达式字符串,例如 `table.select("field.cast(STRING)")`,尚未被更新到新的类型系统中,使用[旧的 Planner 章节](#旧的-planner)中声明的字符串来表示。 -**连接器描述符和 SQL 客户端**:描述符字符串的表示形式尚未更新到新的类型系统。使用在[连接到外部系统章节](./connect.html#type-strings)中声明的字符串表示。 - -**用户自定义函数**:用户自定义函数尚不能声明数据类型。 +**用户自定义函数**:用户自定义聚合函数尚不能声明数据类型,标量函数和表函数充分支持数据类型。 数据类型列表 -- @@ -236,10 +235,11 @@ DataTypes.CHAR(n) **JVM 类型** -| Java 类型 | 输入 | 输出 | 备注 | -|:---|:-:|:--:|:| -|`java.lang.String` | X | X | *缺省* | -|`byte[]`| X | X | 假设使用 UTF-8 编码。 | +| Java 类型 | 输入 | 输出 | 备注 | +|:|:-:|:--:|:| +|`java.lang.String` | X | X | *缺省* | +|`byte[]` | X | X | 假设使用 UTF-8 编码。 | +|`org.apache.flink.table.data.StringData` | X | X | 内部数据结构。 | `VARCHAR` / `STRING` @@ -274,10 +274,11 @@ DataTypes.STRING() **JVM 类型** -| Java 类型 | 输入 | 输出 | 备注 | -|:---|:-:|:--:|:| -|`java.lang.String` | X | X | *缺省* | -|`byte[]`| X | X | 假设使用 UTF-8 编码。 | +| Java 类型 | 输入 | 输出 | 备注 | +|:|:-:|:--:|:| +|`java.lang.String` | X | X | *缺省* | +|`byte[]` | X | X | 假设使用 UTF-8 编码。 | +|`org.apache.flink.table.data.StringData` | X | X | 内部数据结构。 | ### 二进制字符串 @@ -389,9 +390,10 @@ DataTypes.DECIMAL(p, s) **JVM 类型** -| Java 类型 | 输入 | 输出 | 备注 | -|:--|:-:|:--:|:| -|`java.math.BigDecimal` | X | X | *缺省* | +| Java 类型| 输入 | 输出 | 备注 | +|:-|:-:|:--:|:| +|`java.math.BigDecimal`| X | X | *缺省* | +|`org.apache.flink.table.data.DecimalData` | X | X | 内部数据结构。 | `TINYINT` @@ -690,12 +692,13 @@ DataTypes.TIMESTAMP(p) | Java 类型| 输入 | 输出 | 备注 | |:-|:-:|:--:|:| -|`java.time.LocalDateTime` | X | X | *缺省* | -|`java.sql.Timestamp`
[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated udf into Chinese
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 1917af3 [FLINK-18324][docs-zh] Translate updated udf into Chinese 1917af3 is described below commit 1917af3a1eba8450c00a7be8c11796e3db16ddb8 Author: Yubin Li AuthorDate: Tue Jun 30 19:16:34 2020 +0800 [FLINK-18324][docs-zh] Translate updated udf into Chinese This closes #12794 --- docs/dev/table/functions/udfs.zh.md | 1049 --- 1 file changed, 740 insertions(+), 309 deletions(-) diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md index 995a67c..4e3a6be 100644 --- a/docs/dev/table/functions/udfs.zh.md +++ b/docs/dev/table/functions/udfs.zh.md @@ -22,322 +22,854 @@ specific language governing permissions and limitations under the License. --> -自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。 +自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。 + +自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 语言开发自定义函数。 * This will be replaced by the TOC {:toc} -注册自定义函数 -在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。 +概述 + -通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 `TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。 +当前 Flink 有如下几种函数: -关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。 +- *标量函数* 将标量值转换成一个新标量值; +- *表值函数* 将标量值转换成新的行数据; +- *聚合函数* 将多行数据里的标量值转换成一个新标量值; +- *表值聚合函数* 将多行数据里的标量值转换成新的行数据; +- *异步表值函数* 是异步查询外部数据系统的特殊函数。 -{% top %} +注意 标量和表值函数已经使用了新的基于[数据类型]({% link dev/table/types.zh.md %})的类型系统,聚合函数仍然使用基于 `TypeInformation` 的旧类型系统。 -标量函数 - +以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。 -如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 个标量值。 +函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 _内联_ 后直接使用。 - -想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。 - -下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数: + {% highlight java %} -public class HashCode extends ScalarFunction { - private int factor = 12; - - public HashCode(int factor) { - this.factor = factor; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +// 定义函数逻辑 +public static class SubstringFunction extends ScalarFunction { + public String eval(String s, Integer begin, Integer end) { +return s.substring(begin, end); } - - public int eval(String s) { - return s.hashCode() * factor; +} + +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12)); + +// 注册函数 +env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class); + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12)); + +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable"); + +{% endhighlight %} + + + +{% highlight scala %} +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +// define function logic +class SubstringFunction extends ScalarFunction { + def eval(s: String, begin: Integer, end: Integer): String = { +s.substring(begin, end) } } -BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); +val env = TableEnvironment.create(...) + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12)) // 注册函数 -tableEnv.registerFunction("hashCode", new HashCode(10)); +env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction]) + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12)) -// 在 Java Table API 中使用函数 -myTable.select("string, string.hashCode(), hashCode(string)"); +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable") -// 在 SQL API 中使用函数 -tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable"); {% endhighlight %} + -求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。 + -下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType
[flink] branch master updated (32f3eeb -> f0eeaec)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 32f3eeb [FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory add f0eeaec [FLINK-18324][docs-zh] Translate updated udf into Chinese No new revisions were added by this update. Summary of changes: docs/dev/table/functions/udfs.zh.md | 1049 --- 1 file changed, 740 insertions(+), 309 deletions(-)
[flink] branch release-1.11 updated: [FLINK-18324][docs-zh] Translate updated udf into Chinese
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 1917af3 [FLINK-18324][docs-zh] Translate updated udf into Chinese 1917af3 is described below commit 1917af3a1eba8450c00a7be8c11796e3db16ddb8 Author: Yubin Li AuthorDate: Tue Jun 30 19:16:34 2020 +0800 [FLINK-18324][docs-zh] Translate updated udf into Chinese This closes #12794 --- docs/dev/table/functions/udfs.zh.md | 1049 --- 1 file changed, 740 insertions(+), 309 deletions(-) diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md index 995a67c..4e3a6be 100644 --- a/docs/dev/table/functions/udfs.zh.md +++ b/docs/dev/table/functions/udfs.zh.md @@ -22,322 +22,854 @@ specific language governing permissions and limitations under the License. --> -自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。 +自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。 + +自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 语言开发自定义函数。 * This will be replaced by the TOC {:toc} -注册自定义函数 -在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。 +概述 + -通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 `TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。 +当前 Flink 有如下几种函数: -关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。 +- *标量函数* 将标量值转换成一个新标量值; +- *表值函数* 将标量值转换成新的行数据; +- *聚合函数* 将多行数据里的标量值转换成一个新标量值; +- *表值聚合函数* 将多行数据里的标量值转换成新的行数据; +- *异步表值函数* 是异步查询外部数据系统的特殊函数。 -{% top %} +注意 标量和表值函数已经使用了新的基于[数据类型]({% link dev/table/types.zh.md %})的类型系统,聚合函数仍然使用基于 `TypeInformation` 的旧类型系统。 -标量函数 - +以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。 -如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 个标量值。 +函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 _内联_ 后直接使用。 - -想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。 - -下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数: + {% highlight java %} -public class HashCode extends ScalarFunction { - private int factor = 12; - - public HashCode(int factor) { - this.factor = factor; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +// 定义函数逻辑 +public static class SubstringFunction extends ScalarFunction { + public String eval(String s, Integer begin, Integer end) { +return s.substring(begin, end); } - - public int eval(String s) { - return s.hashCode() * factor; +} + +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12)); + +// 注册函数 +env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class); + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12)); + +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable"); + +{% endhighlight %} + + + +{% highlight scala %} +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +// define function logic +class SubstringFunction extends ScalarFunction { + def eval(s: String, begin: Integer, end: Integer): String = { +s.substring(begin, end) } } -BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); +val env = TableEnvironment.create(...) + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12)) // 注册函数 -tableEnv.registerFunction("hashCode", new HashCode(10)); +env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction]) + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12)) -// 在 Java Table API 中使用函数 -myTable.select("string, string.hashCode(), hashCode(string)"); +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable") -// 在 SQL API 中使用函数 -tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable"); {% endhighlight %} + -求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。 + -下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType
[flink] branch master updated (ee65377 -> 1b2d1c7)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ee65377 [FLINK-18361][es][table] Support username and password options for new Elasticsearch connector add 1b2d1c7 [hotfix][doc] Fix temporal_tables correlate with a changing dimension table section No new revisions were added by this update. Summary of changes: docs/dev/table/streaming/temporal_tables.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (ee65377 -> 1b2d1c7)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ee65377 [FLINK-18361][es][table] Support username and password options for new Elasticsearch connector add 1b2d1c7 [hotfix][doc] Fix temporal_tables correlate with a changing dimension table section No new revisions were added by this update. Summary of changes: docs/dev/table/streaming/temporal_tables.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated: [FLINK-18453][tests] Fix overflow of AggregateITCase#testAggregationCodeSplit
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e162bd7 [FLINK-18453][tests] Fix overflow of AggregateITCase#testAggregationCodeSplit e162bd7 is described below commit e162bd7e9064032c0fd5f2035e1a23249f3ca5c2 Author: Benchao Li AuthorDate: Thu Jul 9 22:39:14 2020 +0800 [FLINK-18453][tests] Fix overflow of AggregateITCase#testAggregationCodeSplit This closes #12861 --- .../flink/table/planner/runtime/stream/sql/AggregateITCase.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index d3c6b77..c920bdc 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -1262,7 +1262,10 @@ class AggregateITCase( .toTable(tEnv, 'a, 'b, 'c) tEnv.createTemporaryView("MyTable", t) -val columnNumber = 500 +tEnv.getConfig.setMaxGeneratedCodeLength(2048) + +// 50 can make sure all generated methods of [Namespace]AggsHandleFunction is longer than 2048 +val columnNumber = 50 val selectList = Stream.range(3, columnNumber) .map(i => s"SUM(CASE WHEN a IS NOT NULL AND a > $i THEN 0 WHEN a < 0 THEN 0 ELSE $i END)")
[flink] branch master updated (d0d8037 -> 4b9f9fe)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d0d8037 [FLINK-18470] Ensure rocksdb is loaded in RocksKeyGroupsRocksSingleStateIteratorTest add 4b9f9fe [FLINK-18240][table-planner-blink] Allow to use % as modulus function No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java | 2 +- .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java | 1 + .../org/apache/flink/table/planner/expressions/SqlExpressionTest.scala | 1 + 3 files changed, 3 insertions(+), 1 deletion(-)
[flink] branch master updated (d0d8037 -> 4b9f9fe)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d0d8037 [FLINK-18470] Ensure rocksdb is loaded in RocksKeyGroupsRocksSingleStateIteratorTest add 4b9f9fe [FLINK-18240][table-planner-blink] Allow to use % as modulus function No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java | 2 +- .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java | 1 + .../org/apache/flink/table/planner/expressions/SqlExpressionTest.scala | 1 + 3 files changed, 3 insertions(+), 1 deletion(-)
[flink] branch master updated (02e5977 -> 74502f7)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 02e5977 [FLINK-17465][doc-zh] Update translations for memory configurations. add 74502f7 [FLINK-18324][docs-zh] Translate updated data type into Chinese No new revisions were added by this update. Summary of changes: docs/dev/table/types.zh.md | 334 - 1 file changed, 268 insertions(+), 66 deletions(-)
[flink] branch release-1.11 updated: [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 334b7b8 [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator 334b7b8 is described below commit 334b7b8b00885135b0682756f970b4e440f0f189 Author: Benchao Li AuthorDate: Fri Jun 19 16:15:40 2020 +0800 [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator This closes #12710 --- .../planner/codegen/CodeGeneratorContext.scala | 31 +- .../table/planner/codegen/ExprCodeGenerator.scala | 18 -- .../table/planner/codegen/GenerateUtils.scala | 29 ++ .../planner/codegen/ProjectionCodeGenerator.scala | 3 +- .../codegen/agg/AggsHandlerCodeGenerator.scala | 66 -- .../planner/codegen/agg/DistinctAggCodeGen.scala | 3 +- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 +- .../runtime/stream/sql/AggregateITCase.scala | 26 + 8 files changed, 129 insertions(+), 50 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index 5ef90a2..b9bc6fc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.codegen -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{Function, RuntimeContext} import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.table.api.TableConfig @@ -109,9 +108,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { private var currentMethodNameForLocalVariables = "DEFAULT" /** - * Flag that indicates whether the generated code is split into several methods. + * Flag map that indicates whether the generated code for method is split into several methods. */ - private var isCodeSplit = false + private val isCodeSplitMap = mutable.Map[String, Boolean]() // map of local variable statements. It will be placed in method if method code not excess // max code length, otherwise will be placed in member area of the class. The statements @@ -149,11 +148,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { } /** - * Set the flag [[isCodeSplit]] to be true, which indicates the generated code is split into - * several methods. + * Set the flag [[isCodeSplitMap]] to be true for methodName, which indicates + * the generated code is split into several methods. + * @param methodName the method which will be split. */ - def setCodeSplit(): Unit = { -isCodeSplit = true + def setCodeSplit(methodName: String = currentMethodNameForLocalVariables): Unit = { +isCodeSplitMap(methodName) = true } /** @@ -210,10 +210,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { */ def reuseMemberCode(): String = { val result = reusableMemberStatements.mkString("\n") -if (isCodeSplit) { +if (isCodeSplitMap.nonEmpty) { val localVariableAsMember = reusableLocalVariableStatements.map( -statements => statements._2.map("private " + _).mkString("\n") - ).mkString("\n") +statements => if (isCodeSplitMap.getOrElse(statements._1, false)) { + statements._2.map("private " + _).mkString("\n") +} else { + "" +} + ).filter(_.length > 0).mkString("\n") result + "\n" + localVariableAsMember } else { result @@ -224,8 +228,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { * @return code block of statements that will be placed in the member area of the class * if generated code is split or in local variables of method */ - def reuseLocalVariableCode(methodName: String = null): String = { -if (isCodeSplit) { + def reuseLocalVariableCode(methodName: String = currentMethodNameForLocalVariables): String = { +if (isCodeSplitMap.getOrElse(methodName, false)) { GeneratedExpression.NO_CODE } else if (methodName == null) { reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n") @@ -375,8 +379,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { clazz: Class[_], outRecordTerm: String, outRecordWriterTerm: Option[String] = None): Unit = { -val stateme
[flink] branch master updated: [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 99fca58 [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator 99fca58 is described below commit 99fca58fe60199b93f962e5fbf0cb0314b9d3b99 Author: libenchao AuthorDate: Wed Mar 25 23:37:44 2020 +0800 [FLINK-16589][table-planner-blink] Split code for AggsHandlerCodeGenerator This closes 11512 --- .../planner/codegen/CodeGeneratorContext.scala | 31 +- .../table/planner/codegen/ExprCodeGenerator.scala | 18 -- .../table/planner/codegen/GenerateUtils.scala | 29 ++ .../planner/codegen/ProjectionCodeGenerator.scala | 3 +- .../codegen/agg/AggsHandlerCodeGenerator.scala | 66 -- .../planner/codegen/agg/DistinctAggCodeGen.scala | 3 +- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 +- .../runtime/stream/sql/AggregateITCase.scala | 26 + 8 files changed, 129 insertions(+), 50 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala index 5ef90a2..b9bc6fc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.codegen -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{Function, RuntimeContext} import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.table.api.TableConfig @@ -109,9 +108,9 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { private var currentMethodNameForLocalVariables = "DEFAULT" /** - * Flag that indicates whether the generated code is split into several methods. + * Flag map that indicates whether the generated code for method is split into several methods. */ - private var isCodeSplit = false + private val isCodeSplitMap = mutable.Map[String, Boolean]() // map of local variable statements. It will be placed in method if method code not excess // max code length, otherwise will be placed in member area of the class. The statements @@ -149,11 +148,12 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { } /** - * Set the flag [[isCodeSplit]] to be true, which indicates the generated code is split into - * several methods. + * Set the flag [[isCodeSplitMap]] to be true for methodName, which indicates + * the generated code is split into several methods. + * @param methodName the method which will be split. */ - def setCodeSplit(): Unit = { -isCodeSplit = true + def setCodeSplit(methodName: String = currentMethodNameForLocalVariables): Unit = { +isCodeSplitMap(methodName) = true } /** @@ -210,10 +210,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { */ def reuseMemberCode(): String = { val result = reusableMemberStatements.mkString("\n") -if (isCodeSplit) { +if (isCodeSplitMap.nonEmpty) { val localVariableAsMember = reusableLocalVariableStatements.map( -statements => statements._2.map("private " + _).mkString("\n") - ).mkString("\n") +statements => if (isCodeSplitMap.getOrElse(statements._1, false)) { + statements._2.map("private " + _).mkString("\n") +} else { + "" +} + ).filter(_.length > 0).mkString("\n") result + "\n" + localVariableAsMember } else { result @@ -224,8 +228,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { * @return code block of statements that will be placed in the member area of the class * if generated code is split or in local variables of method */ - def reuseLocalVariableCode(methodName: String = null): String = { -if (isCodeSplit) { + def reuseLocalVariableCode(methodName: String = currentMethodNameForLocalVariables): String = { +if (isCodeSplitMap.getOrElse(methodName, false)) { GeneratedExpression.NO_CODE } else if (methodName == null) { reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n") @@ -375,8 +379,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { clazz: Class[_], outRecordTerm: String, outRecordWriterTerm: Option[String] = None): Unit = { -val statement = generateRecordStatement(t, clazz,
[flink] branch release-1.11 updated: [FLINK-18282][docs-zh] Retranslate the home page document
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 2f8aa59 [FLINK-18282][docs-zh] Retranslate the home page document 2f8aa59 is described below commit 2f8aa5950bc696c9420a892c39cd83bffb8faa4f Author: wangsong2 AuthorDate: Sat Jun 13 20:58:45 2020 +0800 [FLINK-18282][docs-zh] Retranslate the home page document This closes #12642 --- docs/index.zh.md | 82 ++-- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/docs/index.zh.md b/docs/index.zh.md index d21ca73..0f83e89 100644 --- a/docs/index.zh.md +++ b/docs/index.zh.md @@ -23,53 +23,71 @@ specific language governing permissions and limitations under the License. --> + +Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。 Flink 已经可以在所有常见的集群环境中运行,并以 in-memory 的速度和任意的规模进行计算。 + -本文档适用于 Apache Flink {{ site.version_title}} 版本。本页面最近更新于 {% build_time %}. + + -Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心是一个提供数据分发、通信以及自动容错的流计算引擎。Flink 在流计算之上构建批处理,并且原生的支持迭代计算,内存管理以及程序优化。 +### 试用 Flink -## 初步印象 +如果您有兴趣使用 Flink, 可以试试我们的教程: -* **代码练习**: 跟随分步指南通过 Flink API 实现简单应用或查询。 - * [实现 DataStream 应用]({% link try-flink/datastream_api.zh.md %}) - * [书写 Table API 查询]({% link try-flink/table_api.zh.md %}) +* [DataStream API 进行欺诈检测]({% link try-flink/datastream_api.zh.md %}) +* [Table API 构建实时报表]({% link try-flink/table_api.zh.md %}) +* [Python API 教程]({% link try-flink/python_table_api.zh.md %}) +* [Flink 游乐场]({% link try-flink/flink-operations-playground.zh.md %}) -* **Docker 游乐场**: 你只需花几分钟搭建 Flink 沙盒环境,就可以探索和使用 Flink 了。 - * [运行与管理 Flink 流处理应用]({% link try-flink/flink-operations-playground.zh.md %}) +### 学习 Flink -* **概念**: 学习 Flink 的基本概念能更好地理解文档。 - * [有状态流处理](concepts/stateful-stream-processing.html) - * [实时流处理](concepts/timely-stream-processing.html) - * [Flink 架构](concepts/flink-architecture.html) - * [术语表](concepts/glossary.html) +* [操作培训]({% link learn-flink/index.zh.md %}) 包含了一系列的课程和练习,提供了对 Flink 的逐一介绍。 -## API 参考 +* [概念透析]({% link concepts/index.zh.md %}) 介绍了在浏览参考文档之前你需要了解的 Flink 知识。 -API 参考列举并解释了 Flink API 的所有功能。 +### 获取 Flink 帮助 -* [DataStream API](dev/datastream_api.html) -* [DataSet API](dev/batch/index.html) -* [Table API & SQL](dev/table/index.html) +如果你遇到困难了, 可以在 [社区](https://flink.apache.org/zh/community.html)寻求帮助。值得一提的是,Apache Flink 的用户邮件列表一直是 Apache 项目里面最活跃的之一,也是一个快速获得帮助的好途径。 -## 部署 + + -在线上环境运行你的 Flink 作业之前,请阅读 [生产环境注意事项检查清单](ops/production_ready.html)。 +### 探索 Flink -## 发布日志 +参考文档包含了 Flink 所有内容。 你可以从以下几点开始学习: -发布日志包含了 Flink 版本之间的重大更新。请在你升级 Flink 之前仔细阅读相应的发布日志。 + + -* [Flink 1.10 的发布日志](release-notes/flink-1.10.html). -* [Flink 1.9 的发布日志](release-notes/flink-1.9.html)。 -* [Flink 1.8 的发布日志](release-notes/flink-1.8.html)。 -* [Flink 1.7 的发布日志](release-notes/flink-1.7.html)。 -* [Flink 1.6 的发布日志](release-notes/flink-1.6.html)。 -* [Flink 1.5 的发布日志](release-notes/flink-1.5.html)。 +* [DataStream API]({% link dev/datastream_api.zh.md %}) +* [Table API & SQL]({% link dev/table/index.zh.md %}) +* [状态方法]({% if site.is_stable %} {{ site.statefundocs_stable_baseurl }} {% else %} {{ site.statefundocs_baseurl }} {% endif %}) -## 外部资源 + + -- **Flink Forward**: 已举办的所有大会演讲均可在 [Flink Forward](http://flink-forward.org/) 官网以及 [YouTube](https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA)找到。[使用 Apache Flink 进行高可靠的流处理](http://2016.flink-forward.org/kb_sessions/robust-stream-processing-with-apache-flink/) 可以作为你第一个学习的资源。 +* [配置参数]({% link ops/config.zh.md %}) +* [Rest API]({% link monitoring/rest_api.zh.md %}) +* [CLI]({% link ops/cli.zh.md %}) -- **培训**: [培训资料](https://training.ververica.com/) 包含讲义,练习以及示例程序。 + + -- **博客**: [Apache Flink](https://flink.apache.org/blog/) 以及 [Ververica](https://www.ververica.com/blog) 的博客会经常更新一些有关 Flink 的技术文章。 +### 部署 Flink + +在线上环境运行你的 Flink 作业之前,请阅读 [生产环境注意事项检查清单]({% link ops/production_ready.zh.md %}). 各种部署环境一览,详见 [集群与部署]({% link ops/deployment/index.zh.md %}). + +### 升级 Flink + +release notes 包含了 Flink 版本之间的重大更新。请在你升级 Flink 之前仔细阅读相应的 release notes。 + +请阅读 release notes [Flink 1.10]({% link release-notes/flink-1.10.zh.md %}), [Flink 1.9]({% link release-notes/flink-1.9.zh.md %}), [Flink 1.8]({% link release-notes/flink-1.8.zh.md %}), or [Flink 1.7]({% link release-notes/flink-1.7.zh.md %}). + + + + + + +本文档适用于 Apache Flink {{ site.version_title }} 版本。本页面最近更新于: {% build_time %}. + +
[flink] branch master updated (fea20ad -> f4aaf8c)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fea20ad [FLINK-18268][docs] Correct Table API in Temporal table docs add f4aaf8c [FLINK-18282][docs-zh] Retranslate the home page document No new revisions were added by this update. Summary of changes: docs/index.zh.md | 82 ++-- 1 file changed, 50 insertions(+), 32 deletions(-)
[flink] branch master updated (fea20ad -> f4aaf8c)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from fea20ad [FLINK-18268][docs] Correct Table API in Temporal table docs add f4aaf8c [FLINK-18282][docs-zh] Retranslate the home page document No new revisions were added by this update. Summary of changes: docs/index.zh.md | 82 ++-- 1 file changed, 50 insertions(+), 32 deletions(-)
[flink] branch master updated: Fix typo in KafkaResource (#12564)
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4db3680 Fix typo in KafkaResource (#12564) 4db3680 is described below commit 4db368035f00ef7b990a86f1285921e9834e4f6c Author: Da(Dash)Shen AuthorDate: Wed Jun 10 11:23:52 2020 +0800 Fix typo in KafkaResource (#12564) This closes #12564 --- .../src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java index 0157ad2..bd5ba46 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java @@ -102,7 +102,7 @@ public interface KafkaResource extends ExternalResource { /** * Returns the configured KafkaResource implementation, or a {@link LocalStandaloneKafkaResource} if none is configured. * -* @return configured KafkaResource, or {@link LocalStandaloneKafkaResource} is none is configured +* @return configured KafkaResource, or {@link LocalStandaloneKafkaResource} if none is configured */ static KafkaResource get(final String version) { return FactoryUtils.loadAndInvokeFactory(