[flink] branch master updated: [FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some hive udf required constant parameters with implicit constant passed
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fc5730ab2de [FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some hive udf required constant parameters with implicit constant passed fc5730ab2de is described below commit fc5730ab2dee219a4875e78312174c0364579013 Author: yuxia Luo AuthorDate: Wed Aug 31 13:39:34 2022 +0800 [FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some hive udf required constant parameters with implicit constant passed This closes #18975 --- .../delegation/hive/HiveParserCalcitePlanner.java | 3 +- .../hive/HiveParserRexNodeConverter.java | 48 ++ .../hive/HiveParserTypeCheckProcFactory.java | 21 ++ .../connectors/hive/HiveDialectQueryITCase.java| 2 +- .../src/test/resources/query-test/udf.q| 17 5 files changed, 81 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java index e1e5a452d93..4244dbde980 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java @@ -2307,7 +2307,8 @@ public class HiveParserCalcitePlanner { } else { // Case when this is an expression HiveParserTypeCheckCtx typeCheckCtx = -new HiveParserTypeCheckCtx(inputRR, frameworkConfig, cluster); +new HiveParserTypeCheckCtx( +inputRR, true, true, frameworkConfig, cluster); // We allow stateful functions in the SELECT list (but nowhere else) typeCheckCtx.setAllowStatefulFunctions(true); if (!qbp.getDestToGroupBy().isEmpty()) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java index 17ea24a6e4a..54775ed6c55 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java @@ -60,6 +60,8 @@ import org.apache.calcite.util.TimestampString; import org.apache.hadoop.hive.common.type.Decimal128; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -456,9 +458,21 @@ public class HiveParserRexNodeConverter { default: if (hiveShim.isIntervalYearMonthType(hiveTypeCategory)) { // Calcite year-month literal value is months as BigDecimal -BigDecimal totalMonths = -BigDecimal.valueOf( -((HiveParserIntervalYearMonth) value).getTotalMonths()); +BigDecimal totalMonths; +if (value instanceof HiveParserIntervalYearMonth) { +totalMonths = +BigDecimal.valueOf( +((HiveParserIntervalYearMonth) value).getTotalMonths()); +} else if (value instanceof HiveIntervalYearMonth) { +totalMonths = +BigDecimal.valueOf( +((HiveIntervalYearMonth) value).getTotalMonths()); +} else { +throw new SemanticException( +String.format( +"Unexpected class %s for Hive's interval day time type", +value.getClass().getName())); +} calciteLiteral = rexBuilder.makeIntervalLiteral( totalMonths, @@ -467,12 +481,30 @@ public class HiveParserRexNodeCon
[flink-docker] branch master updated: Clarifying instructions for gaining access to flink-docker via INFRA
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new 17ee051 Clarifying instructions for gaining access to flink-docker via INFRA 17ee051 is described below commit 17ee0514e82f9d4ed4e9f69dc10f46c1f6632786 Author: Danny Cranmer AuthorDate: Tue Aug 30 20:56:05 2022 +0100 Clarifying instructions for gaining access to flink-docker via INFRA --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e56d8d9..1d3cb46 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,9 @@ Once the pull request has been merged, we can release the new docker images: For **publishing to DockerHub: apache/flink** , you need to perform the following steps: -1. Make sure that you are authenticated with your Docker ID, and that your Docker ID has access to `apache/flink`. If not, request access by INFRA (see [also](https://issues.apache.org/jira/browse/INFRA-21276): `docker login -u `. +1. Make sure that you are authenticated with your Docker ID, and that your Docker ID has access to `apache/flink`: `docker login -u ` + 1. If you do not have access, you should seek help via the mailing list. + We have a limited number of seats which are full, see [INFRA-23623](https://issues.apache.org/jira/browse/INFRA-23623) for more information. See also [INFRA-21276](https://issues.apache.org/jira/browse/INFRA-21276). 2. Generate and upload the new images: `./publish-to-dockerhub.sh`. (Do not execute on the arm platform machine, such as Apple Silicon) For **publishing as an official image**, a new manifest should be generated and a pull request opened
[flink] branch master updated: [FLINK-29005][parquet] Parquet row type reader should not return null value when some child fields is null
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 00f585234f8 [FLINK-29005][parquet] Parquet row type reader should not return null value when some child fields is null 00f585234f8 is described below commit 00f585234f8db8fe1e2bfec5c6c323ca99d9b775 Author: Kai Chen AuthorDate: Wed Aug 31 12:09:19 2022 +0800 [FLINK-29005][parquet] Parquet row type reader should not return null value when some child fields is null This closes #20616 --- .../connectors/hive/HiveTableSourceITCase.java | 119 + .../parquet/vector/reader/RowColumnReader.java | 15 ++- .../data/columnar/vector/heap/HeapRowVector.java | 8 ++ 3 files changed, 137 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index 99540c947c6..f3cb5e1e874 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -18,12 +18,18 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.FiniteTestSource; import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; @@ -73,6 +79,8 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -963,6 +971,117 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { result.getJobClient().get().cancel(); } +@Test(timeout = 12) +public void testReadParquetWithNullableComplexType() throws Exception { +final String catalogName = "hive"; +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(3); +env.enableCheckpointing(100); +StreamTableEnvironment tEnv = +HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE); +tEnv.registerCatalog(catalogName, hiveCatalog); +tEnv.useCatalog(catalogName); + +List rows = generateRows(); +List expectedRows = generateExpectedRows(rows); +DataStream stream = +env.addSource( +new FiniteTestSource<>(rows), +new RowTypeInfo( +new TypeInformation[] { +Types.INT, +Types.STRING, +new RowTypeInfo( +new TypeInformation[] { +Types.STRING, Types.INT, Types.INT +}, +new String[] {"c1", "c2", "c3"}), +new MapTypeInfo<>(Types.STRING, Types.STRING), +Types.OBJECT_ARRAY(Types.STRING), +Types.STRING +}, +new String[] {"a", "b", "c", "d", "e", "f"})) +.filter((FilterFunction) value -> true) +.setParallelism(3); // to parallel tasks + +tEnv.createTemporaryView("my_table", stream); +assertResults(executeAndGetResult(tEnv), expectedRows); +} + +private static List generateRows() { +List rows = new ArrayList<>(); +for (int i = 0; i < 1; i++) { +Map e = new HashMap<>(); +e.put(i + "", i % 2 == 0 ? null : i + ""); +
[flink] branch master updated: [FLINK-29087][connector/jdbc] Change dependencies order to avoid compile failure while running in idea
This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d55be6850dc [FLINK-29087][connector/jdbc] Change dependencies order to avoid compile failure while running in idea d55be6850dc is described below commit d55be6850dc2a4e0291c0a4853fa5aa7a51a1d10 Author: Xuyang AuthorDate: Wed Aug 31 09:43:25 2022 +0800 [FLINK-29087][connector/jdbc] Change dependencies order to avoid compile failure while running in idea This closes #20670. --- flink-connectors/flink-connector-jdbc/pom.xml | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index a226543ffa5..e7d7db4ccfe 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -105,19 +105,20 @@ under the License. org.apache.flink flink-table-planner_${scala.binary.version} ${project.version} - test-jar test org.apache.flink - flink-table-api-scala-bridge_${scala.binary.version} + flink-table-planner_${scala.binary.version} ${project.version} + test-jar test + org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-api-scala-bridge_${scala.binary.version} ${project.version} test
[flink-kubernetes-operator] branch main updated: [FLINK-29140] Bump Flink version to 1.15.2
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new 1b87903b [FLINK-29140] Bump Flink version to 1.15.2 1b87903b is described below commit 1b87903b8a48faf1531c99c6cdb73878224a27e7 Author: Gabor Somogyi AuthorDate: Tue Aug 30 15:19:21 2022 +0200 [FLINK-29140] Bump Flink version to 1.15.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 08df25fb..eb152774 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ under the License. 5.12.3 1.18.22 -1.15.1 +1.15.2 1.7.36 2.17.1
[flink-web] branch asf-site updated: [FLINK-29141] Update scala suffixes
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new ee10ce51d [FLINK-29141] Update scala suffixes ee10ce51d is described below commit ee10ce51dc83eb69164bba28c2ccd4c87797f211 Author: Chesnay Schepler AuthorDate: Tue Aug 30 11:52:30 2022 +0200 [FLINK-29141] Update scala suffixes --- downloads.md| 18 +- downloads.zh.md | 18 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/downloads.md b/downloads.md index 9f131a0f2..a4a98f266 100644 --- a/downloads.md +++ b/downloads.md @@ -207,7 +207,7 @@ Along with our releases, we also provide sha512 hashes in `*.sha512` files and c You can add the following dependencies to your `pom.xml` to include Apache Flink in your project. These dependencies include a local execution environment and thus support local testing. -- **Scala API**: To use the Scala API, replace the `flink-java` artifact id with `flink-scala_2.11` and `flink-streaming-java_2.11` with `flink-streaming-scala_2.11`. +- **Scala API**: To use the Scala API, replace the `flink-java` artifact id with `flink-scala_2.12` and `flink-streaming-java` with `flink-streaming-scala_2.12`. ```xml @@ -217,12 +217,12 @@ You can add the following dependencies to your `pom.xml` to include Apache Flink org.apache.flink - flink-streaming-java_2.11 + flink-streaming-java {{ site.FLINK_VERSION_STABLE }} org.apache.flink - flink-clients_2.11 + flink-clients {{ site.FLINK_VERSION_STABLE }} ``` @@ -254,26 +254,26 @@ You can add the following dependencies to your `pom.xml` to include Apache Flink ```xml org.apache.flink - flink-ml-core_2.12 + flink-ml-core {{ site.FLINK_ML_VERSION_STABLE }} org.apache.flink - flink-ml-iteration_2.12 + flink-ml-iteration {{ site.FLINK_ML_VERSION_STABLE }} org.apache.flink - flink-ml-lib_2.12 + flink-ml-lib {{ site.FLINK_ML_VERSION_STABLE }} ``` Advanced users could only import a minimal set of Flink ML dependencies for their target use-cases: -- Use artifact `flink-ml-core_2.12` in order to develop custom ML algorithms. -- Use artifacts `flink-ml-core_2.12` and `flink-ml-iteration_2.12` in order to develop custom ML algorithms which require iteration. -- Use artifact `flink-ml-lib_2.12` in order to use the off-the-shelf ML algorithms from Flink ML. +- Use artifact `flink-ml-core` in order to develop custom ML algorithms. +- Use artifacts `flink-ml-core` and `flink-ml-iteration` in order to develop custom ML algorithms which require iteration. +- Use artifact `flink-ml-lib` in order to use the off-the-shelf ML algorithms from Flink ML. ### Apache Flink Kubernetes Operator diff --git a/downloads.zh.md b/downloads.zh.md index 5977636df..7be268512 100644 --- a/downloads.zh.md +++ b/downloads.zh.md @@ -204,7 +204,7 @@ Apache Flink® Table Store {{ site.FLINK_TABLE_STORE_VERSION_STABLE }} is the la 你只要将以下依赖项添加到 `pom.xml` 中,就能在项目中引入 Apache Flink 。这些依赖项包含了本地执行环境,因此支持本地测试。 -- **Scala API**: 为了使用 Scala API,将 `flink-java` 的 artifact id 替换为 `flink-scala_2.11`,同时将 `flink-streaming-java_2.11` 替换为 `flink-streaming-scala_2.11`。 +- **Scala API**: 为了使用 Scala API,将 `flink-java` 的 artifact id 替换为 `flink-scala_2.12`,同时将 `flink-streaming-java` 替换为 `flink-streaming-scala_2.12`。 ```xml @@ -214,12 +214,12 @@ Apache Flink® Table Store {{ site.FLINK_TABLE_STORE_VERSION_STABLE }} is the la org.apache.flink - flink-streaming-java_2.11 + flink-streaming-java {{ site.FLINK_VERSION_STABLE }} org.apache.flink - flink-clients_2.11 + flink-clients {{ site.FLINK_VERSION_STABLE }} ``` @@ -251,26 +251,26 @@ Apache Flink® Table Store {{ site.FLINK_TABLE_STORE_VERSION_STABLE }} is the la ```xml org.apache.flink - flink-ml-core_2.12 + flink-ml-core {{ site.FLINK_ML_VERSION_STABLE }} org.apache.flink - flink-ml-iteration_2.12 + flink-ml-iteration {{ site.FLINK_ML_VERSION_STABLE }} org.apache.flink - flink-ml-lib_2.12 + flink-ml-lib {{ site.FLINK_ML_VERSION_STABLE }} ``` 高级用户可以根据使用场景来只包含最小集合的依赖: -- 依赖组件 `flink-ml-core_2.12` 来开发不使用迭代的自定义机器学习算法。 -- 依赖组件 `flink-ml-core_2.12` 与 `flink-ml-iteration_2.12` 来开发使用迭代的自定义机器学习算法。 -- 依赖组件 `flink-ml-lib_2.12` 来使用 Flink ML 提供的机器学习算法。 +- 依赖组件 `flink-ml-core` 来开发不使用迭代的自定义机器学习算法。 +- 依赖组件 `flink-ml-core` 与 `flink-ml-iteration` 来开发使用迭代的自定义机器学习算法。 +- 依赖组件 `flink-ml-lib` 来使用 Flink ML 提供的机器学习算法。 ## 旧版本的更新策略 截至2017年3月,Flink 社区[决定](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Time-based-releases-in-Flink-tp15386p15394.html)使用 bugfix 来支持当前和之前的次要版本。如果 1.2.x 是当前的正式版本,则 1.1.y 是之前的次要支持版本。这两个版本都将收到关键问题的 bugfix。
[flink] branch master updated: [FLINK-28751][table] Improve the performance of JSON functions with json path (#20397)
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2220f24925a [FLINK-28751][table] Improve the performance of JSON functions with json path (#20397) 2220f24925a is described below commit 2220f24925ab5146d5771c3782ed8c0837bb0bc4 Author: Aitozi AuthorDate: Tue Aug 30 20:30:42 2022 +0800 [FLINK-28751][table] Improve the performance of JSON functions with json path (#20397) --- .../table/runtime/functions/JsonPathCache.java | 43 ++ .../table/runtime/functions/SqlJsonUtils.java | 5 +++ 2 files changed, 48 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/JsonPathCache.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/JsonPathCache.java new file mode 100644 index 000..097419459c9 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/JsonPathCache.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions; + +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; + +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.spi.cache.Cache; + +/** The default cache for the jsonpath {@link com.jayway.jsonpath.spi.cache.CacheProvider}. */ +public class JsonPathCache implements Cache { + +private static final long DEFAULT_CACHE_MAXIMUM_SIZE = 400; + +private final org.apache.flink.shaded.guava30.com.google.common.cache.Cache +jsonPathCache = + CacheBuilder.newBuilder().maximumSize(DEFAULT_CACHE_MAXIMUM_SIZE).build(); + +@Override +public JsonPath get(String s) { +return jsonPathCache.getIfPresent(s); +} + +@Override +public void put(String s, JsonPath jsonPath) { +jsonPathCache.put(s, jsonPath); +} +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java index 7602a1946e3..2d7dec14288 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java @@ -41,6 +41,7 @@ import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.cache.CacheProvider; import com.jayway.jsonpath.spi.json.JacksonJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import com.jayway.jsonpath.spi.mapper.MappingProvider; @@ -74,6 +75,10 @@ public class SqlJsonUtils { private SqlJsonUtils() {} +static { +CacheProvider.setCache(new JsonPathCache()); +} + /** Returns the {@link JsonNodeFactory} for creating nodes. */ public static JsonNodeFactory getNodeFactory() { return MAPPER.getNodeFactory();
[flink] branch master updated (b3dcafa9db2 -> c37643031dc)
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from b3dcafa9db2 [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page add c37643031dc [FLINK-29123][k8s] Dynamic paramters are not pushed to working with kubernetes No new revisions were added by this update. Summary of changes: .../generated/kubernetes_config_configuration.html | 12 .../kubernetes/configuration/KubernetesConfigOptions.java| 12 .../kubeclient/decorators/CmdJobManagerDecorator.java| 6 +- .../kubeclient/decorators/CmdTaskManagerDecorator.java | 4 +++- .../parameters/KubernetesJobManagerParameters.java | 4 .../parameters/KubernetesTaskManagerParameters.java | 5 + .../kubernetes/kubeclient/KubernetesJobManagerTestBase.java | 3 +++ .../kubernetes/kubeclient/KubernetesTaskManagerTestBase.java | 3 +++ .../kubeclient/decorators/CmdJobManagerDecoratorTest.java| 6 +- .../kubeclient/decorators/CmdTaskManagerDecoratorTest.java | 4 +++- 10 files changed, 55 insertions(+), 4 deletions(-)
[flink] branch master updated: [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b3dcafa9db2 [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page b3dcafa9db2 is described below commit b3dcafa9db278fc02945c7bc5c32765c99d00bb1 Author: Chengkai Yang AuthorDate: Sun Jul 3 22:44:47 2022 +0800 [FLINK-28121][docs-zh]Translate "Extension Points" and "Full Stack Example" in "User-defined Sources & Sinks" page --- docs/content.zh/docs/dev/table/sourcesSinks.md | 329 +++-- 1 file changed, 144 insertions(+), 185 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md b/docs/content.zh/docs/dev/table/sourcesSinks.md index 36fcf912e7f..e897f15a1f2 100644 --- a/docs/content.zh/docs/dev/table/sourcesSinks.md +++ b/docs/content.zh/docs/dev/table/sourcesSinks.md @@ -133,242 +133,209 @@ If you need a feature available only internally within the `org.apache.flink.tab To learn more, check out [Anatomy of Table Dependencies]({{< ref "docs/dev/configuration/advanced" >}}#anatomy-of-table-dependencies). {{< /hint >}} -Extension Points + + +扩展点 -This section explains the available interfaces for extending Flink's table connectors. +这一部分主要介绍扩展 Flink table connector 时可能用到的接口。 + + -### Dynamic Table Factories +### 动态表的工厂类 -Dynamic table factories are used to configure a dynamic table connector for an external storage system from catalog -and session information. +在根据 catalog 与 Flink 运行时上下文信息,为某个外部存储系统配置动态表连接器时,需要用到动态表的工厂类。 -`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. +比如,通过实现 `org.apache.flink.table.factories.DynamicTableSourceFactory` 接口完成一个工厂类,来生产 `DynamicTableSource` 类。 -`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. +通过实现 `org.apache.flink.table.factories.DynamicTableSinkFactory` 接口完成一个工厂类,来生产 `DynamicTableSink` 类。 -By default, the factory is discovered using the value of the `connector` option as the factory identifier -and Java's Service Provider Interface. +默认情况下,Java 的 SPI 机制会自动识别这些工厂类,同时将 `connector` 配置项作为工厂类的”标识符“。 -In JAR files, references to new implementations can be added to the service file: +在 JAR 文件中,需要将实现的工厂类路径放入到下面这个配置文件: `META-INF/services/org.apache.flink.table.factories.Factory` -The framework will check for a single matching factory that is uniquely identified by factory identifier -and requested base class (e.g. `DynamicTableSourceFactory`). +Flink 会对工厂类逐个进行检查,确保其“标识符”是全局唯一的,并且按照要求实现了上面提到的接口 (比如 `DynamicTableSourceFactory`)。 + +如果必要的话,也可以在实现 catalog 时绕过上述 SPI 机制识别工厂类的过程。即在实现 catalog 接口时,在`org.apache.flink.table.catalog.Catalog#getFactory` 方法中直接返回工厂类的实例。 + + -The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a -catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. +### 动态表的 source 端 -### Dynamic Table Source +按照定义,动态表是随时间变化的。 -By definition, a dynamic table can change over time. +在读取动态表时,表中数据可以是以下情况之一: +- changelog 流(支持有界或无界),在 changelog 流结束前,所有的改变都会被源源不断地消费,由 `ScanTableSource` 接口表示。 +- 处于一直变换或数据量很大的外部表,其中的数据一般不会被全量读取,除非是在查询某个值时,由 `LookupTableSource` 接口表示。 -When reading a dynamic table, the content can either be considered as: -- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog - is exhausted. This is represented by the `ScanTableSource` interface. -- A continuously changing or very large external table whose content is usually never read entirely - but queried for individual values when necessary. This is represented by the `LookupTableSource` - interface. +一个类可以同时实现这两个接口,Planner 会根据查询的 Query 选择相应接口中的方法。 -A class can implement both of these interfaces at the same time. The planner decides about their usage depending -on the specified query. + Scan Table Source -A `ScanTableSource` scans all rows from an external storage system during runtime. +在运行期间,`ScanTableSource` 接口会按行扫描外部存储系统中所有数据。 -The scanned rows don't have to contain only insertions but can also contain updates and deletions. Thus, -the table source can be used to read a (finite or infinite) changelog. The returned _changelog mode_ indicates -the set of changes that the planner can expect during runtime. +被扫描的数据可以是 insert、update、delete 三种操作类型,因此数据源可以用作读取 changelog (支持有界或无界)。在运行时,返回的 **_changelog mode_** 表示 Planner 要处理的操作类型。 -For regular batch scenarios, the source can emit a bounded stream of insert-only rows. +在常规批处理的场景下,数据源可以处理 insert-only 操作类型的有界数据流。 -For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows. +在常规流处理的场景下,数据
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 09053f542 Rebuild website 09053f542 is described below commit 09053f5420d4918b1f72cd7a1db9b14eeda8c536 Author: Chesnay Schepler AuthorDate: Tue Aug 30 11:48:56 2022 +0200 Rebuild website --- content/downloads.html| 66 -- content/zh/downloads.html | 91 +-- 2 files changed, 8 insertions(+), 149 deletions(-) diff --git a/content/downloads.html b/content/downloads.html index ea0669306..fbae13732 100644 --- a/content/downloads.html +++ b/content/downloads.html @@ -242,9 +242,6 @@ Apache Flink 1.15.2 Apache Flink 1.14.5 - Apache Flink 1.13.6 - Apache Flink 1.12.7 - Apache Flink 1.11.6 Apache Flink Stateful Functions 3.2.0 Apache Flink ML 2.1.0 Apache Flink ML 2.0.0 @@ -315,69 +312,6 @@ -Apache Flink 1.13.6 - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz"; id="1136-download_211">Apache Flink 1.13.6 for Scala 2.11 (https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz"; id="1136-download_212">Apache Flink 1.13.6 for Scala 2.12 (https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-src.tgz"; id="1135-download-source">Apache Flink 1.13.6 Source Release -(https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-src.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-src.tgz.sha512";>sha512) - - -Release Notes - -Please have a look at the https://nightlies.apache.org/flink/flink-docs-release-1.13/release-notes/flink-1.13";>Release Notes for Flink 1.13 if you plan to upgrade your Flink setup from a previous version. - - - -Apache Flink 1.12.7 - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz"; id="1127-download_211">Apache Flink 1.12.7 for Scala 2.11 (https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"; id="1127-download_212">Apache Flink 1.12.7 for Scala 2.12 (https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-src.tgz"; id="1127-download-source">Apache Flink 1.12.7 Source Release -(https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-src.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-src.tgz.sha512";>sha512) - - -Release Notes - -Please have a look at the https://nightlies.apache.org/flink/flink-docs-release-1.12/release-notes/flink-1.12.html";>Release Notes for Flink 1.12 if you plan to upgrade your Flink setup from a previous version. - - - -Apache Flink 1.11.6 - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz"; id="1116-download_211">Apache Flink 1.11.6 for Scala 2.11 (https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz"; id="1116-download_212">Apache Flink 1.11.6 for Scala 2.12 (https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz.sha512";>sha512) - - - -https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-src.tgz"; id="1116-download-source">Apache Flink 1.11.6 Source Release -(https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-src.tgz.asc";>asc, https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-src.tgz.sha512";>sha512) - - -Release Notes - -Please have a look at the https://nightlies.apache.org/flink/flink-docs-release-1.11/release-notes/flink-1.11.html";>Release Notes for Flink 1.11 if you plan to upgrade your Flink setup from a previous version. - - - Apache Flink® Stateful Functions 3.2.0 is the latest stable release for the https://flink.apache.org/stateful-funct
[flink-docker] branch master updated: [hotfix] Remove 1.11-1.13
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git The following commit(s) were added to refs/heads/master by this push: new cb711ef [hotfix] Remove 1.11-1.13 cb711ef is described below commit cb711efe5a545db69fbee01e83f590400875107f Author: Chesnay Schepler AuthorDate: Tue Aug 30 10:32:23 2022 +0200 [hotfix] Remove 1.11-1.13 --- 1.11/scala_2.11-java11-debian/Dockerfile | 87 1.11/scala_2.11-java11-debian/docker-entrypoint.sh | 125 - 1.11/scala_2.11-java11-debian/release.metadata | 2 - 1.11/scala_2.11-java8-debian/Dockerfile| 87 1.11/scala_2.11-java8-debian/docker-entrypoint.sh | 125 - 1.11/scala_2.11-java8-debian/release.metadata | 2 - 1.11/scala_2.12-java11-debian/Dockerfile | 87 1.11/scala_2.12-java11-debian/docker-entrypoint.sh | 125 - 1.11/scala_2.12-java11-debian/release.metadata | 2 - 1.11/scala_2.12-java8-debian/Dockerfile| 87 1.11/scala_2.12-java8-debian/docker-entrypoint.sh | 125 - 1.11/scala_2.12-java8-debian/release.metadata | 2 - 1.12/scala_2.11-java11-debian/Dockerfile | 87 1.12/scala_2.11-java11-debian/docker-entrypoint.sh | 154 - 1.12/scala_2.11-java11-debian/release.metadata | 2 - 1.12/scala_2.11-java8-debian/Dockerfile| 87 1.12/scala_2.11-java8-debian/docker-entrypoint.sh | 154 - 1.12/scala_2.11-java8-debian/release.metadata | 2 - 1.12/scala_2.12-java11-debian/Dockerfile | 87 1.12/scala_2.12-java11-debian/docker-entrypoint.sh | 154 - 1.12/scala_2.12-java11-debian/release.metadata | 2 - 1.12/scala_2.12-java8-debian/Dockerfile| 87 1.12/scala_2.12-java8-debian/docker-entrypoint.sh | 154 - 1.12/scala_2.12-java8-debian/release.metadata | 2 - 1.13/scala_2.11-java11-debian/Dockerfile | 87 1.13/scala_2.11-java11-debian/docker-entrypoint.sh | 138 -- 1.13/scala_2.11-java11-debian/release.metadata | 2 - 1.13/scala_2.11-java8-debian/Dockerfile| 87 1.13/scala_2.11-java8-debian/docker-entrypoint.sh | 138 -- 1.13/scala_2.11-java8-debian/release.metadata | 2 - 1.13/scala_2.12-java11-debian/Dockerfile | 87 1.13/scala_2.12-java11-debian/docker-entrypoint.sh | 138 -- 1.13/scala_2.12-java11-debian/release.metadata | 2 - 1.13/scala_2.12-java8-debian/Dockerfile| 87 1.13/scala_2.12-java8-debian/docker-entrypoint.sh | 138 -- 1.13/scala_2.12-java8-debian/release.metadata | 2 - 36 files changed, 2736 deletions(-) diff --git a/1.11/scala_2.11-java11-debian/Dockerfile b/1.11/scala_2.11-java11-debian/Dockerfile deleted file mode 100644 index bee0d0e..000 --- a/1.11/scala_2.11-java11-debian/Dockerfile +++ /dev/null @@ -1,87 +0,0 @@ -### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -### - -FROM openjdk:11-jre - -# Install dependencies -RUN set -ex; \ - apt-get update; \ - apt-get -y install libsnappy1v5 gettext-base; \ - rm -rf /var/lib/apt/lists/* - -# Grab gosu for easy step-down from root -ENV GOSU_VERSION 1.11 -RUN set -ex; \ - wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture)"; \ - wget -nv -O /usr/local/bin/gosu.asc "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture).asc"; \ - export GNUPGHOME="$(mktemp -d)"; \ - for server in ha.pool.sks-keyservers.net $(shuf -e \ - hkp://p80.pool.sks-keyservers.net:80 \ - keyserver.ubuntu.com \ - hkp://keyserv
[flink-web] branch asf-site updated: [hotfix] Remove 1.11-1.13
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new dff7a673a [hotfix] Remove 1.11-1.13 dff7a673a is described below commit dff7a673a185866d07d5b2cf840a1f5ebe2e9dca Author: Chesnay Schepler AuthorDate: Tue Aug 30 10:30:10 2022 +0200 [hotfix] Remove 1.11-1.13 --- _config.yml | 63 - 1 file changed, 63 deletions(-) diff --git a/_config.yml b/_config.yml index 965f6b121..8d09c4133 100644 --- a/_config.yml +++ b/_config.yml @@ -109,69 +109,6 @@ flink_releases: asc_url: "https://downloads.apache.org/flink/flink-1.14.5/flink-1.14.5-src.tgz.asc"; sha512_url: "https://downloads.apache.org/flink/flink-1.14.5/flink-1.14.5-src.tgz.sha512"; release_notes_url: "https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14"; - - -version_short: "1.13" -binary_release: - name: "Apache Flink 1.13.6" - scala_211: -id: "1136-download_211" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz.sha512"; - scala_212: -id: "1136-download_212" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz.sha512"; -source_release: - name: "Apache Flink 1.13.6" - id: "1135-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-src.tgz"; - asc_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-src.tgz.asc"; - sha512_url: "https://downloads.apache.org/flink/flink-1.13.6/flink-1.13.6-src.tgz.sha512"; -release_notes_url: "https://nightlies.apache.org/flink/flink-docs-release-1.13/release-notes/flink-1.13"; - - -version_short: "1.12" -binary_release: - name: "Apache Flink 1.12.7" - scala_211: -id: "1127-download_211" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.11.tgz.sha512"; - scala_212: -id: "1127-download_212" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz.sha512"; -source_release: - name: "Apache Flink 1.12.7" - id: "1127-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.12.7/flink-1.12.7-src.tgz"; - asc_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-src.tgz.asc"; - sha512_url: "https://downloads.apache.org/flink/flink-1.12.7/flink-1.12.7-src.tgz.sha512"; -release_notes_url: "https://nightlies.apache.org/flink/flink-docs-release-1.12/release-notes/flink-1.12.html"; - - -version_short: "1.11" -binary_release: - name: "Apache Flink 1.11.6" - scala_211: -id: "1116-download_211" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.11.tgz.sha512"; - scala_212: -id: "1116-download_212" -url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz"; -asc_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz.asc"; -sha512_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-bin-scala_2.12.tgz.sha512"; -source_release: - name: "Apache Flink 1.11.6" - id: "1116-download-source" - url: "https://www.apache.org/dyn/closer.lua/flink/flink-1.11.6/flink-1.11.6-src.tgz"; - asc_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-src.tgz.asc"; - sha512_url: "https://downloads.apache.org/flink/flink-1.11.6/flink-1.11.6-src.tgz.sha512"; -release_notes_url: "https://nightlies.apache.org/flink/flink-docs-release-1
[flink] branch master updated (a1d74c131b0 -> 7669daffdc5)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from a1d74c131b0 [FLINK-29038][runtime] Fix unstable case AsyncWaitOperatorTest#testProcessingTimeRepeatedCompleteOrderedWithRetry add 7669daffdc5 [FLINK-28814][Connectors][JDBC] Update org.postgresql:postgresql to 42.4.1 No new revisions were added by this update. Summary of changes: flink-connectors/flink-connector-jdbc/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink-ml] branch master updated: [FLINK-29011] Add Transformer for Binarizer
This is an automated email from the ASF dual-hosted git repository. zhangzp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git The following commit(s) were added to refs/heads/master by this push: new 5f99ce8 [FLINK-29011] Add Transformer for Binarizer 5f99ce8 is described below commit 5f99ce8687b00c0cd0392a67c677f56a4f121a91 Author: weibo AuthorDate: Tue Aug 30 15:19:40 2022 +0800 [FLINK-29011] Add Transformer for Binarizer This closes #146. --- .../ml/examples/feature/BinarizerExample.java | 85 +++ .../flink/ml/feature/binarizer/Binarizer.java | 160 + .../ml/feature/binarizer/BinarizerParams.java | 50 +++ .../org/apache/flink/ml/feature/BinarizerTest.java | 159 .../examples/ml/feature/binarizer_example.py | 69 + .../pyflink/ml/lib/feature/binarizer.py| 70 + .../pyflink/ml/lib/feature/tests/test_binarizer.py | 95 7 files changed, 688 insertions(+) diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BinarizerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BinarizerExample.java new file mode 100644 index 000..0b51363 --- /dev/null +++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BinarizerExample.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.examples.feature; + +import org.apache.flink.ml.feature.binarizer.Binarizer; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import java.util.Arrays; + +/** Simple program that creates a Binarizer instance and uses it for feature engineering. */ +public class BinarizerExample { +public static void main(String[] args) { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// Generates input data. +DataStream inputStream = +env.fromElements( +Row.of( +1, +Vectors.dense(1, 2), +Vectors.sparse( +17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})), +Row.of( +2, +Vectors.dense(2, 1), +Vectors.sparse( +17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})), +Row.of( +3, +Vectors.dense(5, 18), +Vectors.sparse( +17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0}))); + +Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2"); + +// Creates a Binarizer object and initializes its parameters. +Binarizer binarizer = +new Binarizer() +.setInputCols("f0", "f1", "f2") +.setOutputCols("of0", "of1", "of2") +.setThresholds(0.0, 0.0, 0.0); + +// Transforms input data. +Table outputTable = binarizer.transform(inputTable)[0]; + +// Extracts and displays the results. +for (CloseableIterator it = outputTable.execute().collect(); it.hasNext(); ) { +Row row = it.next(); + +Object[] inputValues = new Object[binarizer.getInputCols().length]; +Object[] outputValues = new Object[binarizer.getInputCols().length]; +for (int i = 0; i < inputValues.length; i++) { +inputValues[i] = row.getField(binarizer