[flink] branch release-1.11 updated: [FLINK-18224][docs] Add document about sql client's tableau result mode
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new a76a7cc [FLINK-18224][docs] Add document about sql client's tableau result mode a76a7cc is described below commit a76a7cc2f8113f39ab18d7114b9680dabad3a49d Author: Kurt Young AuthorDate: Thu Jun 11 13:27:39 2020 +0800 [FLINK-18224][docs] Add document about sql client's tableau result mode This closes #12569 (cherry picked from commit 6fed1a136382c294a84ea29a278f33d7976e55d3) --- docs/dev/table/sqlClient.md| 45 -- docs/dev/table/sqlClient.zh.md | 45 -- .../flink-sql-client/conf/sql-client-defaults.yaml | 2 +- 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index 8331efe..4a229be 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -66,7 +66,7 @@ SELECT 'Hello World'; This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. -The CLI supports **two modes** for maintaining and visualizing results. +The CLI supports **three modes** for maintaining and visualizing results. The **table mode** materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: @@ -80,7 +80,18 @@ The **changelog mode** does not materialize results and visualizes the result st SET execution.result-mode=changelog; {% endhighlight %} -You can use the following query to see both result modes in action: +The **tableau mode** is more like a traditional way which will display the results in the screen directly with a tableau format. +The displaying content will be influenced by the query execution type(`execution.type`). + +{% highlight text %} +SET execution.result-mode=tableau; +{% endhighlight %} + +Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of +this query is bounded, the job will terminate after Flink processed all input data, and the printing will also be stopped automatically. +Otherwise, if you want to terminate a running query, just type `CTRL-C` in this case, the job and the printing will be stopped. + +You can use the following query to see all the result modes in action: {% highlight sql %} SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; @@ -106,9 +117,35 @@ Alice, 1 Greg, 1 {% endhighlight %} -Both result modes can be useful during the prototyping of SQL queries. In both modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured [maximum number of rows](sqlClient.html#configuration) (`max-table-result-rows`). +In *tableau mode*, if you ran the query in streaming mode, the displayed result would be: +{% highlight text %} ++-+--+--+ +| +/- | name | cnt | ++-+--+--+ +| + | Bob |1 | +| + |Alice |1 | +| + | Greg |1 | +| - | Bob |1 | +| + | Bob |2 | ++-+--+--+ +Received a total of 5 rows +{% endhighlight %} + +And if you ran the query in batch mode, the displayed result would be: +{% highlight text %} ++---+-+ +| name | cnt | ++---+-+ +| Alice | 1 | +| Bob | 2 | +| Greg | 1 | ++---+-+ +3 rows in set +{% endhighlight %} + +All these result modes can be useful during the prototyping of SQL queries. In all these modes, results are stored in the Java heap memory of the SQL Client. In order to keep the CLI interface responsive, the changelog mode only shows the latest 1000 changes. The table mode allows for navigating through bigger results that are only limited by the available main memory and the configured [maximum number of rows](sqlClient.html#configuration) (`max-table-result-rows`). -Attention Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. +Attention Queries that are executed in a batch environment, can only be retrieved using the `table` or `tableau` result mode.
[flink] branch master updated (88cc44a -> 6fed1a1)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 88cc44a [hotfix][avro] Link to Hadoop Integration in Avro format documentation add 6fed1a1 [FLINK-18224][docs] Add document about sql client's tableau result mode No new revisions were added by this update. Summary of changes: docs/dev/table/sqlClient.md| 45 -- docs/dev/table/sqlClient.zh.md | 45 -- .../flink-sql-client/conf/sql-client-defaults.yaml | 2 +- 3 files changed, 83 insertions(+), 9 deletions(-)
[flink] branch master updated (88cc44a -> 6fed1a1)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 88cc44a [hotfix][avro] Link to Hadoop Integration in Avro format documentation add 6fed1a1 [FLINK-18224][docs] Add document about sql client's tableau result mode No new revisions were added by this update. Summary of changes: docs/dev/table/sqlClient.md| 45 -- docs/dev/table/sqlClient.zh.md | 45 -- .../flink-sql-client/conf/sql-client-defaults.yaml | 2 +- 3 files changed, 83 insertions(+), 9 deletions(-)
[flink] branch release-1.11 updated: [hotfix][avro] Link to Hadoop Integration in Avro format documentation
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new a8ed790 [hotfix][avro] Link to Hadoop Integration in Avro format documentation a8ed790 is described below commit a8ed7903e62fbaadf6760502ed1b66efe7ec3f4c Author: Jark Wu AuthorDate: Thu Jun 11 11:11:08 2020 +0800 [hotfix][avro] Link to Hadoop Integration in Avro format documentation This closes #12580 --- docs/dev/table/connectors/formats/avro.md| 19 +++ docs/dev/table/connectors/formats/avro.zh.md | 19 +++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/docs/dev/table/connectors/formats/avro.md b/docs/dev/table/connectors/formats/avro.md index 8870235..93f758a 100644 --- a/docs/dev/table/connectors/formats/avro.md +++ b/docs/dev/table/connectors/formats/avro.md @@ -36,9 +36,20 @@ Dependencies In order to setup the Avro format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. -| Maven dependency | SQL Client JAR | -| :- | :--| -| `flink-avro` | [Pre-bundled Hadoop](https://flink.apache.org/downloads.html#additional-components) | + + +Avro format is part of the binary distribution, but requires additional [Hadoop dependency]({% link ops/deployment/hadoop.md %}) for cluster execution. + + +{% highlight xml %} + + org.apache.flink + flink-avro + {{ site.version }} + +{% endhighlight %} + + How to create a table with Avro format @@ -53,7 +64,7 @@ CREATE TABLE user_behavior ( item_id BIGINT, category_id BIGINT, behavior STRING, - ts TIMESTAMP(3), + ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', diff --git a/docs/dev/table/connectors/formats/avro.zh.md b/docs/dev/table/connectors/formats/avro.zh.md index 8870235..01c9ace 100644 --- a/docs/dev/table/connectors/formats/avro.zh.md +++ b/docs/dev/table/connectors/formats/avro.zh.md @@ -36,9 +36,20 @@ Dependencies In order to setup the Avro format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. -| Maven dependency | SQL Client JAR | -| :- | :--| -| `flink-avro` | [Pre-bundled Hadoop](https://flink.apache.org/downloads.html#additional-components) | + + +Avro format is part of the binary distribution, but requires additional [Hadoop dependency]({% link ops/deployment/hadoop.zh.md %}) for cluster execution. + + +{% highlight xml %} + + org.apache.flink + flink-avro + {{ site.version }} + +{% endhighlight %} + + How to create a table with Avro format @@ -53,7 +64,7 @@ CREATE TABLE user_behavior ( item_id BIGINT, category_id BIGINT, behavior STRING, - ts TIMESTAMP(3), + ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior',
[flink] branch master updated (f59b8b4 -> 88cc44a)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f59b8b4 [FLINK-18232][hive] Fix Hive streaming source bugs add 88cc44a [hotfix][avro] Link to Hadoop Integration in Avro format documentation No new revisions were added by this update. Summary of changes: docs/dev/table/connectors/formats/avro.md| 19 +++ docs/dev/table/connectors/formats/avro.zh.md | 19 +++ 2 files changed, 30 insertions(+), 8 deletions(-)
[flink-web] 01/02: Syncing the ZH version of the Docs Style Guide with recent changes and correcting an additional point.
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit d60d1296a8e02917ecb70f03fa57d97d2d0f7005 Author: Marta Paes Moreira AuthorDate: Wed Jun 10 10:07:18 2020 +0200 Syncing the ZH version of the Docs Style Guide with recent changes and correcting an additional point. --- contributing/docs-style.md| 2 +- contributing/docs-style.zh.md | 8 +++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/contributing/docs-style.md b/contributing/docs-style.md index a7c79be..d016d00 100644 --- a/contributing/docs-style.md +++ b/contributing/docs-style.md @@ -82,7 +82,7 @@ opening a pull-request. ## Repository Markdown files (.md) should have a short name that summarizes the topic -covered, spelled in **lowercase** and with **underscores** separating the +covered, spelled in **lowercase** and with **dashes (-)** separating the words. The Chinese version file should have the same name as the English version, but suffixed with **.zh.md**. diff --git a/contributing/docs-style.zh.md b/contributing/docs-style.zh.md index a39f6df..d016d00 100644 --- a/contributing/docs-style.zh.md +++ b/contributing/docs-style.zh.md @@ -82,7 +82,7 @@ opening a pull-request. ## Repository Markdown files (.md) should have a short name that summarizes the topic -covered, spelled in **lowercase** and with **underscores** separating the +covered, spelled in **lowercase** and with **dashes (-)** separating the words. The Chinese version file should have the same name as the English version, but suffixed with **.zh.md**. @@ -336,13 +336,11 @@ overwriting. [Link Text](#heading-title) ``` -* **Links to other pages of the Flink documentation.** The base relative path - to the domain of the documentation is available as a configuration variable - named `baseurl`. +* **Links to other pages of the Flink documentation.** {% raw %} ```liquid - [Link Text]({{ site.baseurl }}{% link path/to/link-page.md %}) + [Link Text]({% link path/to/link-page.md %}) ``` {% endraw %}
[flink-web] branch asf-site updated (985d9da -> 82c710d)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 985d9da rebuild website new d60d129 Syncing the ZH version of the Docs Style Guide with recent changes and correcting an additional point. new 82c710d rebuild website The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: content/contributing/docs-style.html| 2 +- content/zh/contributing/docs-style.html | 8 +++- contributing/docs-style.md | 2 +- contributing/docs-style.zh.md | 8 +++- 4 files changed, 8 insertions(+), 12 deletions(-)
[flink-web] 02/02: rebuild website
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 82c710d04c98c9a3539ddca62bbd6e3cc5165589 Author: Seth Wiesman AuthorDate: Wed Jun 10 21:39:15 2020 -0500 rebuild website --- content/contributing/docs-style.html| 2 +- content/zh/contributing/docs-style.html | 8 +++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/content/contributing/docs-style.html b/content/contributing/docs-style.html index 5c96a82..cf04a02 100644 --- a/content/contributing/docs-style.html +++ b/content/contributing/docs-style.html @@ -328,7 +328,7 @@ opening a pull-request. Repository Markdown files (.md) should have a short name that summarizes the topic -covered, spelled in lowercase and with underscores separating the +covered, spelled in lowercase and with dashes (-) separating the words. The Chinese version file should have the same name as the English version, but suffixed with .zh.md. diff --git a/content/zh/contributing/docs-style.html b/content/zh/contributing/docs-style.html index 0c52d6d..066dad6 100644 --- a/content/zh/contributing/docs-style.html +++ b/content/zh/contributing/docs-style.html @@ -326,7 +326,7 @@ opening a pull-request. Repository Markdown files (.md) should have a short name that summarizes the topic -covered, spelled in lowercase and with underscores separating the +covered, spelled in lowercase and with dashes (-) separating the words. The Chinese version file should have the same name as the English version, but suffixed with .zh.md. @@ -575,11 +575,9 @@ making the heading lowercase and replacing internal spaces with hyphens. [Link Text](#heading-title) -Links to other pages of the Flink documentation. The base relative path -to the domain of the documentation is available as a configuration variable -named baseurl. +Links to other pages of the Flink documentation. -[Link Text]({{ site.baseurl }}{% link path/to/link-page.md %}) +[Link Text]({% link path/to/link-page.md %}) Links to external pages
[flink] branch release-1.11 updated: [FLINK-18232][hive] Fix Hive streaming source bugs
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new a80f769 [FLINK-18232][hive] Fix Hive streaming source bugs a80f769 is described below commit a80f7690367e9a07b19321f3b34d8fdba17295b8 Author: Jingsong Lee AuthorDate: Thu Jun 11 10:30:46 2020 +0800 [FLINK-18232][hive] Fix Hive streaming source bugs This closes #12573 --- .../flink/connectors/hive/HiveTableSource.java | 7 +- .../hive/read/DirectoryMonitorDiscovery.java | 32 +++- .../hive/read/HiveMapredSplitReader.java | 49 +--- .../hive/read/HiveTableFileInputFormat.java| 19 - .../connectors/hive/HiveTableSourceITCase.java | 10 ++- .../hive/read/DirectoryMonitorDiscoveryTest.java | 90 ++ .../hive/read/HiveTableFileInputFormatTest.java| 50 7 files changed, 214 insertions(+), 43 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 080cea8..2be4443 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -46,6 +46,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.filesystem.FileSystemLookupFunction; import org.apache.flink.table.filesystem.FileSystemOptions; import org.apache.flink.table.functions.AsyncTableFunction; @@ -87,7 +88,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills; +import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toLocalDateTime; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN; @@ -283,7 +284,9 @@ public class HiveTableSource implements } String consumeOffset = configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); - long currentReadTime = toMills(consumeOffset); + // to Local zone mills instead of UTC mills + long currentReadTime = TimestampData.fromLocalDateTime(toLocalDateTime(consumeOffset)) + .toTimestamp().getTime(); Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java index f027302..aa15c81 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java @@ -18,7 +18,9 @@ package org.apache.flink.connectors.hive.read; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.TimestampData; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -26,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Partition; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; @@ -41,18 +44,39 @@ public class DirectoryMonitorDiscovery implements PartitionDiscovery { Context context, long previousTimestamp) throws Exception { FileStatus[] statuses = getFileStatusRecurse( context.tableLocation(), context.partitionKeys().size(), context.fileSystem()); + List, Long>> partValueList = suitablePartitions(context, previousTimestamp, statuses); + List> partitions = new ArrayList<>(); + for (Tuple2, Long> tuple2 : partValueList) { + context.getPartition(tuple2.f0).ifPresent( +
[flink] branch master updated (216f65f -> f59b8b4)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 216f65f [hotfix][runtime] Remove ExecutionSlotAllocator#stop() which is never used in production add f59b8b4 [FLINK-18232][hive] Fix Hive streaming source bugs No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/HiveTableSource.java | 7 +- .../hive/read/DirectoryMonitorDiscovery.java | 32 +++- .../hive/read/HiveMapredSplitReader.java | 49 +--- .../hive/read/HiveTableFileInputFormat.java| 19 - .../connectors/hive/HiveTableSourceITCase.java | 10 ++- .../hive/read/DirectoryMonitorDiscoveryTest.java | 90 ++ .../hive/read/HiveTableFileInputFormatTest.java| 33 7 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java copy flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java => flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java (54%)
[flink] branch master updated (216f65f -> f59b8b4)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 216f65f [hotfix][runtime] Remove ExecutionSlotAllocator#stop() which is never used in production add f59b8b4 [FLINK-18232][hive] Fix Hive streaming source bugs No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/HiveTableSource.java | 7 +- .../hive/read/DirectoryMonitorDiscovery.java | 32 +++- .../hive/read/HiveMapredSplitReader.java | 49 +--- .../hive/read/HiveTableFileInputFormat.java| 19 - .../connectors/hive/HiveTableSourceITCase.java | 10 ++- .../hive/read/DirectoryMonitorDiscoveryTest.java | 90 ++ .../hive/read/HiveTableFileInputFormatTest.java| 33 7 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java copy flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java => flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java (54%)
[flink] branch master updated (3dedb85 -> 216f65f)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3dedb85 [FLINK-18239][e2e] Pin minikube version to v1.8.2 add e850760 [FLINK-18034][runtime] Introduce PreferredLocationsRetriever add 31282fe [FLINK-18034][runtime] ExecutionSlotAllocator uses PreferredLocationsRetriever to get preferred locations for tasks add 8ef1e7d [FLINK-18034][runtime] Remove unused preferredLocations from ExecutionVertexSchedulingRequirements add 216f65f [hotfix][runtime] Remove ExecutionSlotAllocator#stop() which is never used in production No new revisions were added by this update. Summary of changes: .../scheduler/DefaultExecutionSlotAllocator.java | 89 +- .../DefaultExecutionSlotAllocatorFactory.java | 4 +- .../DefaultPreferredLocationsRetriever.java| 142 +++ .../flink/runtime/scheduler/DefaultScheduler.java | 2 +- .../runtime/scheduler/ExecutionSlotAllocator.java | 6 - .../scheduler/ExecutionSlotAllocatorFactory.java | 2 +- .../ExecutionVertexSchedulingRequirements.java | 24 +-- ...xecutionVertexSchedulingRequirementsMapper.java | 13 +- ...iever.java => PreferredLocationsRetriever.java} | 27 ++- .../flink/runtime/scheduler/SchedulerBase.java | 11 +- .../runtime/scheduler/StateLocationRetriever.java | 18 +- ...ecutionSlotAllocatorPreferredLocationsTest.java | 116 - .../DefaultExecutionSlotAllocatorTest.java | 49 +++--- .../DefaultPreferredLocationsRetrieverTest.java| 190 + .../scheduler/TestExecutionSlotAllocator.java | 5 - .../TestExecutionSlotAllocatorFactory.java | 2 +- .../scheduler/TestingStateLocationRetriever.java} | 27 ++- 17 files changed, 407 insertions(+), 320 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{InputsLocationsRetriever.java => PreferredLocationsRetriever.java} (57%) copy flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StateLocationRetriever.java (59%) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorPreferredLocationsTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java copy flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java => test/java/org/apache/flink/runtime/scheduler/TestingStateLocationRetriever.java} (57%)
[flink] branch master updated (3dedb85 -> 216f65f)
This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3dedb85 [FLINK-18239][e2e] Pin minikube version to v1.8.2 add e850760 [FLINK-18034][runtime] Introduce PreferredLocationsRetriever add 31282fe [FLINK-18034][runtime] ExecutionSlotAllocator uses PreferredLocationsRetriever to get preferred locations for tasks add 8ef1e7d [FLINK-18034][runtime] Remove unused preferredLocations from ExecutionVertexSchedulingRequirements add 216f65f [hotfix][runtime] Remove ExecutionSlotAllocator#stop() which is never used in production No new revisions were added by this update. Summary of changes: .../scheduler/DefaultExecutionSlotAllocator.java | 89 +- .../DefaultExecutionSlotAllocatorFactory.java | 4 +- .../DefaultPreferredLocationsRetriever.java| 142 +++ .../flink/runtime/scheduler/DefaultScheduler.java | 2 +- .../runtime/scheduler/ExecutionSlotAllocator.java | 6 - .../scheduler/ExecutionSlotAllocatorFactory.java | 2 +- .../ExecutionVertexSchedulingRequirements.java | 24 +-- ...xecutionVertexSchedulingRequirementsMapper.java | 13 +- ...iever.java => PreferredLocationsRetriever.java} | 27 ++- .../flink/runtime/scheduler/SchedulerBase.java | 11 +- .../runtime/scheduler/StateLocationRetriever.java | 18 +- ...ecutionSlotAllocatorPreferredLocationsTest.java | 116 - .../DefaultExecutionSlotAllocatorTest.java | 49 +++--- .../DefaultPreferredLocationsRetrieverTest.java| 190 + .../scheduler/TestExecutionSlotAllocator.java | 5 - .../TestExecutionSlotAllocatorFactory.java | 2 +- .../scheduler/TestingStateLocationRetriever.java} | 27 ++- 17 files changed, 407 insertions(+), 320 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{InputsLocationsRetriever.java => PreferredLocationsRetriever.java} (57%) copy flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResourceFactory.java => flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StateLocationRetriever.java (59%) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorPreferredLocationsTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java copy flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java => test/java/org/apache/flink/runtime/scheduler/TestingStateLocationRetriever.java} (57%)
[flink] branch release-1.11 updated: [FLINK-18239][e2e] Pin minikube version to v1.8.2
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new ea5aae2 [FLINK-18239][e2e] Pin minikube version to v1.8.2 ea5aae2 is described below commit ea5aae232865d7fe16041a5ecd448760ae4abecc Author: wangyang0918 AuthorDate: Wed Jun 10 20:19:06 2020 +0800 [FLINK-18239][e2e] Pin minikube version to v1.8.2 --- .../test-scripts/common_kubernetes.sh| 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh index d6fcacf..e553bfe 100755 --- a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh +++ b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh @@ -24,6 +24,8 @@ CONTAINER_SCRIPTS=${END_TO_END_DIR}/test-scripts/container-scripts MINIKUBE_START_RETRIES=3 MINIKUBE_START_BACKOFF=5 RESULT_HASH="e682ec6622b5e83f2eb614617d5ab2cf" +MINIKUBE_VERSION="v1.8.2" +MINIKUBE_PATH="/usr/local/bin/minikube-$MINIKUBE_VERSION" NON_LINUX_ENV_NOTE="** Please start/stop minikube manually in non-linux environment. **" @@ -42,15 +44,15 @@ function setup_kubernetes_for_linux { chmod +x kubectl && sudo mv kubectl /usr/local/bin/ fi # Download minikube. -if ! [ -x "$(command -v minikube)" ]; then -echo "Installing minikube ..." -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.8.2/minikube-linux-$arch && \ -chmod +x minikube && sudo mv minikube /usr/local/bin/ +if ! [ -x "$(command -v minikube)" ] || ! [[ $(minikube version) =~ "$MINIKUBE_VERSION" ]]; then +echo "Installing minikube to $MINIKUBE_PATH ..." +curl -Lo minikube https://storage.googleapis.com/minikube/releases/$MINIKUBE_VERSION/minikube-linux-$arch && \ +chmod +x minikube && sudo mv minikube $MINIKUBE_PATH fi } function check_kubernetes_status { -minikube status +$MINIKUBE_PATH status return $? } @@ -73,7 +75,7 @@ function start_kubernetes_if_not_running { # here. # Similarly, the kubelets are marking themself as "low disk space", # causing Flink to avoid this node (again, failing the test) -sudo CHANGE_MINIKUBE_NONE_USER=true minikube start --vm-driver=none \ +sudo CHANGE_MINIKUBE_NONE_USER=true $MINIKUBE_PATH start --vm-driver=none \ --extra-config=kubelet.image-gc-high-threshold=99 \ --extra-config=kubelet.image-gc-low-threshold=98 \ --extra-config=kubelet.minimum-container-ttl-duration=120m \ @@ -81,7 +83,7 @@ function start_kubernetes_if_not_running { --extra-config=kubelet.eviction-soft="memory.available<5Mi,nodefs.available<2Mi,imagefs.available<2Mi" \ --extra-config=kubelet.eviction-soft-grace-period="memory.available=2h,nodefs.available=2h,imagefs.available=2h" # Fix the kubectl context, as it's often stale. -minikube update-context +$MINIKUBE_PATH update-context fi check_kubernetes_status @@ -101,7 +103,7 @@ function start_kubernetes { exit 1 fi fi -eval $(minikube docker-env) +eval $($MINIKUBE_PATH docker-env) } function stop_kubernetes { @@ -109,7 +111,7 @@ function stop_kubernetes { echo "$NON_LINUX_ENV_NOTE" else echo "Stopping minikube ..." -stop_command="sudo minikube stop" +stop_command="sudo $MINIKUBE_PATH stop" if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} "${stop_command}"; then echo "Could not stop minikube. Aborting..." exit 1
[flink] branch master updated: [FLINK-18239][e2e] Pin minikube version to v1.8.2
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3dedb85 [FLINK-18239][e2e] Pin minikube version to v1.8.2 3dedb85 is described below commit 3dedb85000d722620d65ef41e791a8e1d1a910e2 Author: wangyang0918 AuthorDate: Wed Jun 10 20:19:06 2020 +0800 [FLINK-18239][e2e] Pin minikube version to v1.8.2 --- .../test-scripts/common_kubernetes.sh| 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh index d6fcacf..e553bfe 100755 --- a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh +++ b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh @@ -24,6 +24,8 @@ CONTAINER_SCRIPTS=${END_TO_END_DIR}/test-scripts/container-scripts MINIKUBE_START_RETRIES=3 MINIKUBE_START_BACKOFF=5 RESULT_HASH="e682ec6622b5e83f2eb614617d5ab2cf" +MINIKUBE_VERSION="v1.8.2" +MINIKUBE_PATH="/usr/local/bin/minikube-$MINIKUBE_VERSION" NON_LINUX_ENV_NOTE="** Please start/stop minikube manually in non-linux environment. **" @@ -42,15 +44,15 @@ function setup_kubernetes_for_linux { chmod +x kubectl && sudo mv kubectl /usr/local/bin/ fi # Download minikube. -if ! [ -x "$(command -v minikube)" ]; then -echo "Installing minikube ..." -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.8.2/minikube-linux-$arch && \ -chmod +x minikube && sudo mv minikube /usr/local/bin/ +if ! [ -x "$(command -v minikube)" ] || ! [[ $(minikube version) =~ "$MINIKUBE_VERSION" ]]; then +echo "Installing minikube to $MINIKUBE_PATH ..." +curl -Lo minikube https://storage.googleapis.com/minikube/releases/$MINIKUBE_VERSION/minikube-linux-$arch && \ +chmod +x minikube && sudo mv minikube $MINIKUBE_PATH fi } function check_kubernetes_status { -minikube status +$MINIKUBE_PATH status return $? } @@ -73,7 +75,7 @@ function start_kubernetes_if_not_running { # here. # Similarly, the kubelets are marking themself as "low disk space", # causing Flink to avoid this node (again, failing the test) -sudo CHANGE_MINIKUBE_NONE_USER=true minikube start --vm-driver=none \ +sudo CHANGE_MINIKUBE_NONE_USER=true $MINIKUBE_PATH start --vm-driver=none \ --extra-config=kubelet.image-gc-high-threshold=99 \ --extra-config=kubelet.image-gc-low-threshold=98 \ --extra-config=kubelet.minimum-container-ttl-duration=120m \ @@ -81,7 +83,7 @@ function start_kubernetes_if_not_running { --extra-config=kubelet.eviction-soft="memory.available<5Mi,nodefs.available<2Mi,imagefs.available<2Mi" \ --extra-config=kubelet.eviction-soft-grace-period="memory.available=2h,nodefs.available=2h,imagefs.available=2h" # Fix the kubectl context, as it's often stale. -minikube update-context +$MINIKUBE_PATH update-context fi check_kubernetes_status @@ -101,7 +103,7 @@ function start_kubernetes { exit 1 fi fi -eval $(minikube docker-env) +eval $($MINIKUBE_PATH docker-env) } function stop_kubernetes { @@ -109,7 +111,7 @@ function stop_kubernetes { echo "$NON_LINUX_ENV_NOTE" else echo "Stopping minikube ..." -stop_command="sudo minikube stop" +stop_command="sudo $MINIKUBE_PATH stop" if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} "${stop_command}"; then echo "Could not stop minikube. Aborting..." exit 1
[flink] branch master updated: [FLINK-18239][e2e] Pin minikube version to v1.8.2
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3dedb85 [FLINK-18239][e2e] Pin minikube version to v1.8.2 3dedb85 is described below commit 3dedb85000d722620d65ef41e791a8e1d1a910e2 Author: wangyang0918 AuthorDate: Wed Jun 10 20:19:06 2020 +0800 [FLINK-18239][e2e] Pin minikube version to v1.8.2 --- .../test-scripts/common_kubernetes.sh| 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh index d6fcacf..e553bfe 100755 --- a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh +++ b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh @@ -24,6 +24,8 @@ CONTAINER_SCRIPTS=${END_TO_END_DIR}/test-scripts/container-scripts MINIKUBE_START_RETRIES=3 MINIKUBE_START_BACKOFF=5 RESULT_HASH="e682ec6622b5e83f2eb614617d5ab2cf" +MINIKUBE_VERSION="v1.8.2" +MINIKUBE_PATH="/usr/local/bin/minikube-$MINIKUBE_VERSION" NON_LINUX_ENV_NOTE="** Please start/stop minikube manually in non-linux environment. **" @@ -42,15 +44,15 @@ function setup_kubernetes_for_linux { chmod +x kubectl && sudo mv kubectl /usr/local/bin/ fi # Download minikube. -if ! [ -x "$(command -v minikube)" ]; then -echo "Installing minikube ..." -curl -Lo minikube https://storage.googleapis.com/minikube/releases/v1.8.2/minikube-linux-$arch && \ -chmod +x minikube && sudo mv minikube /usr/local/bin/ +if ! [ -x "$(command -v minikube)" ] || ! [[ $(minikube version) =~ "$MINIKUBE_VERSION" ]]; then +echo "Installing minikube to $MINIKUBE_PATH ..." +curl -Lo minikube https://storage.googleapis.com/minikube/releases/$MINIKUBE_VERSION/minikube-linux-$arch && \ +chmod +x minikube && sudo mv minikube $MINIKUBE_PATH fi } function check_kubernetes_status { -minikube status +$MINIKUBE_PATH status return $? } @@ -73,7 +75,7 @@ function start_kubernetes_if_not_running { # here. # Similarly, the kubelets are marking themself as "low disk space", # causing Flink to avoid this node (again, failing the test) -sudo CHANGE_MINIKUBE_NONE_USER=true minikube start --vm-driver=none \ +sudo CHANGE_MINIKUBE_NONE_USER=true $MINIKUBE_PATH start --vm-driver=none \ --extra-config=kubelet.image-gc-high-threshold=99 \ --extra-config=kubelet.image-gc-low-threshold=98 \ --extra-config=kubelet.minimum-container-ttl-duration=120m \ @@ -81,7 +83,7 @@ function start_kubernetes_if_not_running { --extra-config=kubelet.eviction-soft="memory.available<5Mi,nodefs.available<2Mi,imagefs.available<2Mi" \ --extra-config=kubelet.eviction-soft-grace-period="memory.available=2h,nodefs.available=2h,imagefs.available=2h" # Fix the kubectl context, as it's often stale. -minikube update-context +$MINIKUBE_PATH update-context fi check_kubernetes_status @@ -101,7 +103,7 @@ function start_kubernetes { exit 1 fi fi -eval $(minikube docker-env) +eval $($MINIKUBE_PATH docker-env) } function stop_kubernetes { @@ -109,7 +111,7 @@ function stop_kubernetes { echo "$NON_LINUX_ENV_NOTE" else echo "Stopping minikube ..." -stop_command="sudo minikube stop" +stop_command="sudo $MINIKUBE_PATH stop" if ! retry_times ${MINIKUBE_START_RETRIES} ${MINIKUBE_START_BACKOFF} "${stop_command}"; then echo "Could not stop minikube. Aborting..." exit 1
[flink] branch master updated (4555ad9 -> fcab7b3)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 4555ad9 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap add 6ca803f [hotfix][table-common] Implement first type strategy add 0a0b4a4 [hotfix][table-common] Add a common type strategy add fcab7b3 [FLINK-13784][table] Implement type inference for math functions No new revisions were added by this update. Summary of changes: .../flink/table/api/internal/BaseExpressions.java | 6 +- .../operations/utils/ValuesOperationFactory.java | 6 +- .../table/functions/BuiltInFunctionDefinition.java | 24 +- .../functions/BuiltInFunctionDefinitions.java | 295 --- .../table/types/inference/InputTypeStrategies.java | 21 +- .../table/types/inference/TypeStrategies.java | 202 +++ .../strategies/CommonInputTypeStrategy.java| 6 +- ...ngTypeStrategy.java => CommonTypeStrategy.java} | 19 +- .../strategies/FamilyArgumentTypeStrategy.java | 29 +- ...citTypeStrategy.java => FirstTypeStrategy.java} | 27 +- .../inference/strategies/MapInputTypeStrategy.java | 6 +- ...eStrategy.java => MatchFamilyTypeStrategy.java} | 30 +- .../flink/table/types/logical/LogicalType.java | 4 +- .../table/types/logical/LogicalTypeFamily.java | 2 + .../flink/table/types/logical/LogicalTypeRoot.java | 4 + ...Generalization.java => LogicalTypeMerging.java} | 91 - ...izationTest.java => LogicalCommonTypeTest.java} | 17 +- .../table/types/inference/TypeStrategiesTest.java | 57 ++- .../catalog/FunctionCatalogOperatorTable.java | 7 + .../converter/CustomizedConvertRule.java | 33 +- .../expressions/converter/DirectConvertRule.java | 4 +- .../functions/sql/FlinkSqlOperatorTable.java | 2 +- .../table/planner/plan/type/FlinkReturnTypes.java | 36 +- .../expressions/PlannerExpressionParserImpl.scala | 8 +- .../table/planner/calcite/FlinkTypeSystem.scala| 33 +- .../planner/codegen/calls/ScalarOperatorGens.scala | 13 +- .../expressions/PlannerExpressionConverter.scala | 185 -- .../planner/expressions/ReturnTypeInference.scala | 217 --- .../table/planner/expressions/arithmetic.scala | 149 .../planner/expressions/mathExpressions.scala | 401 - .../flink/table/planner/expressions/time.scala | 67 +--- .../table/planner/typeutils/TypeCoercion.scala | 22 -- .../planner/typeutils/TypeInfoCheckUtils.scala | 139 +-- .../expressions/BuiltInFunctionTestBase.java | 6 + .../planner/expressions/MathFunctionsITCase.java | 110 +- .../planner/plan/stream/table/OverWindowTest.xml | 4 +- .../table/planner/plan/stream/table/ValuesTest.xml | 52 +-- .../planner/calcite/FlinkTypeFactoryTest.scala | 5 +- .../planner/expressions/ScalarFunctionsTest.scala | 139 --- .../expressions/PlannerExpressionConverter.scala | 8 +- .../expressions/PlannerExpressionParserImpl.scala | 8 +- .../table/api/stream/table/OverWindowTest.scala| 2 +- .../table/expressions/ScalarOperatorsTest.scala| 1 - 43 files changed, 904 insertions(+), 1593 deletions(-) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{MissingTypeStrategy.java => CommonTypeStrategy.java} (63%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ExplicitTypeStrategy.java => FirstTypeStrategy.java} (64%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{ExplicitTypeStrategy.java => MatchFamilyTypeStrategy.java} (56%) rename flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/{LogicalTypeGeneralization.java => LogicalTypeMerging.java} (88%) rename flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/{LogicalTypeGeneralizationTest.java => LogicalCommonTypeTest.java} (96%) delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/ReturnTypeInference.scala delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/arithmetic.scala delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/mathExpressions.scala
[flink] 08/08: [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d75ec8efe3c31f2a5e1a4572b5cfb8000ddbf67 Author: Roman Khachatryan AuthorDate: Thu Jun 4 10:06:15 2020 +0200 [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification --- .../flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 5d7a2c9..d904c87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -306,6 +306,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } } + channelStateWriter.abort(checkpointId, new CancellationException("checkpoint aborted via notification"), false); + for (StreamOperatorWrapper operatorWrapper : operatorChain.getAllOperators(true)) { try { operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);
[flink] 07/08: [FLINK-17869][tests] Unignore UnalignedCheckpointITCase
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 0634a2efd2f0ca42d4cb6762015be650c04a3136 Author: Roman Khachatryan AuthorDate: Wed Jun 3 22:12:15 2020 +0200 [FLINK-17869][tests] Unignore UnalignedCheckpointITCase --- .../org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 47e7007..6211077 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.TestLogger; import org.apache.commons.lang3.ArrayUtils; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; @@ -102,7 +101,6 @@ import static org.hamcrest.Matchers.greaterThan; * The number of successful checkpoints is indeed {@code >=n}. * */ -@Ignore public class UnalignedCheckpointITCase extends TestLogger { public static final String NUM_INPUTS = "inputs"; public static final String NUM_OUTPUTS = "outputs";
[flink] 06/08: [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e Author: Roman Khachatryan AuthorDate: Wed Jun 3 21:43:56 2020 +0200 [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator Check (by task thread) whether the current checkpoint was already aborted in the following scenario: 1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start checkpointing (netty thread) 2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread) 3. task thread processes a mail to start checkpointing --- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 22 +++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index cf8a21e..5d7a2c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; @@ -57,10 +56,12 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, OperatorChain operatorChain) throws IOException { LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, taskName); + lastCheckpointId = Math.max(lastCheckpointId, checkpointId); + Iterator iterator = abortedCheckpointIds.iterator(); + while (iterator.hasNext()) { + long next = iterator.next(); + if (next < lastCheckpointId) { + iterator.remove(); + } else { + break; + } + } checkpointStorage.clearCacheFor(checkpointId); @@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments + if (lastCheckpointId >= metadata.getCheckpointId()) { + LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); + channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); + checkAndClearAbortedStatus(metadata.getCheckpointId()); + return; + } + // Step (0): Record the last triggered checkpointId. - Preconditions.checkArgument(lastCheckpointId < metadata.getCheckpointId(), String.format( - "Unexpected current checkpoint-id: %s vs last checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId)); lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
[flink] 05/08: [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit e14958dcd09f7c32ba2ba25fc6a48696fe7eeca2 Author: Roman Khachatryan AuthorDate: Wed Jun 3 15:56:47 2020 +0200 [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS ChannelStateWriter map is cleaned up by the task thread, so the check in netty thread should take possible delay into account. --- .../apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 3e18050..89a5247 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -56,7 +56,7 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque public class ChannelStateWriterImpl implements ChannelStateWriter { private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class); - private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently, only single in-flight checkpoint is supported + private static final int DEFAULT_MAX_CHECKPOINTS = 1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox) private final String taskName; private final ChannelStateWriteRequestExecutor executor;
[flink] 03/08: [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:12:04 2020 +0200 [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058 which introduced a race condition when task thread and netty thread compete for ChannelStateWriteResult. Instead, next commits fix it by: 1. Map size validation error will be prevented simply by increasing the limit 2. When a checkpoint is subsumed, it's write result will be removed from on future completion --- .../checkpoint/channel/ChannelStateWriter.java| 4 +++- .../checkpoint/channel/ChannelStateWriterImpl.java| 1 - .../channel/ChannelStateWriterImplTest.java | 19 --- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index af2a708..02a3a69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable { @Override public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { - return ChannelStateWriteResult.EMPTY; + return new ChannelStateWriteResult( + CompletableFuture.completedFuture(Collections.emptyList()), + CompletableFuture.completedFuture(Collections.emptyList())); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 6158358..8996b3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { @Override public void start(long checkpointId, CheckpointOptions checkpointOptions) { - results.keySet().forEach(oldCheckpointId -> abort(oldCheckpointId, new Exception("Starting new checkpoint " + checkpointId))); LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions); ChannelStateWriteResult result = new ChannelStateWriteResult(); ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 9d7a7ea..0dae88e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest { unwrappingError(TestException.class, () -> callStart(writer)); } - @Test - public void testStartAbortsOldCheckpoints() throws Exception { - int maxCheckpoints = 10; - runWithSyncWorker((writer, worker) -> { - writer.start(0, CheckpointOptions.forCheckpointWithDefaultLocation()); - ChannelStateWriteResult writeResult = writer.getWriteResult(0); - for (int i = 1; i <= maxCheckpoints; i++) { + @Test(expected = IllegalStateException.class) + public void testLimit() throws IOException { + int maxCheckpoints = 3; + try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - worker.processAllRequests(); - assertTrue(writeResult.isDone()); - writeResult = writer.getWriteResult(i);
[flink] branch release-1.11 updated (31a17cb -> 1d75ec8)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 31a17cb [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap new 4030b1b [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages new 36609bb [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable new 6cb8f28 [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." new d806924 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed new e14958d [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS new 7bb3ffa [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator new 0634a2e [FLINK-17869][tests] Unignore UnalignedCheckpointITCase new 1d75ec8 [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification The 8 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../checkpoint/channel/ChannelStateWriter.java | 29 +--- .../checkpoint/channel/ChannelStateWriterImpl.java | 25 -- .../channel/ChannelStateWriterImplTest.java| 49 +--- .../checkpoint/channel/MockChannelStateWriter.java | 8 +--- .../channel/RecordingChannelStateWriter.java | 5 -- .../runtime/state/ChannelPersistenceITCase.java| 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++-- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 --- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +- .../runtime/tasks/LocalStateForwardingTest.java| 2 - .../tasks/TestSubtaskCheckpointCoordinator.java| 2 +- .../checkpointing/UnalignedCheckpointITCase.java | 2 - 12 files changed, 95 insertions(+), 120 deletions(-)
[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 36609bb5dc2aee4061a47f7a767630f1f5912d96 Author: Roman Khachatryan AuthorDate: Wed Jun 3 23:03:52 2020 +0200 [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable OperatorSnapshotFinalizer already waits and holds this future. ChannelStateWriter.getWriteResult() can then be non-idempotent. ChannelStateWriter.stop() can then be removed. --- .../checkpoint/channel/ChannelStateWriter.java | 21 ++-- .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++-- .../channel/ChannelStateWriterImplTest.java| 28 ++ .../checkpoint/channel/MockChannelStateWriter.java | 6 + .../channel/RecordingChannelStateWriter.java | 5 .../runtime/state/ChannelPersistenceITCase.java| 2 +- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 - .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +--- .../runtime/tasks/LocalStateForwardingTest.java| 2 -- 9 files changed, 24 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 5dad559..af2a708 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained -* using {@link #getWriteResult} +* using {@link #getAndRemoveWriteResult} */ void finishInput(long checkpointId); @@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained -* using {@link #getWriteResult} +* using {@link #getAndRemoveWriteResult} */ void finishOutput(long checkpointId); /** * Aborts the checkpoint and fails pending result for this checkpoint. +* @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ void abort(long checkpointId, Throwable cause); /** -* Must be called after {@link #start(long, CheckpointOptions)}. +* Must be called after {@link #start(long, CheckpointOptions)} once. +* @throws IllegalArgumentException if the passed checkpointId is not known. */ - ChannelStateWriteResult getWriteResult(long checkpointId); - - /** -* Cleans up the internal state for the given checkpoint. -*/ - void stop(long checkpointId); + ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException; ChannelStateWriter NO_OP = new NoOpChannelStateWriter(); @@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { return ChannelStateWriteResult.EMPTY; } @Override public void close() { } - - @Override - public void stop(long checkpointId) { - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index fc8655c..6158358 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult
[flink] 04/08: [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit d8069249703bbe7858e0c6a044deb54ce75e3989 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:25:01 2020 +0200 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed Motivation: stop writing channel state ASAP if the checkpoint is subsumed Changes: 1. complete CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture 2. abort channel state write on its erroneous completion 3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false in the call above --- .../checkpoint/channel/ChannelStateWriter.java | 4 +-- .../checkpoint/channel/ChannelStateWriterImpl.java | 6 +++-- .../channel/ChannelStateWriterImplTest.java| 2 +- .../checkpoint/channel/MockChannelStateWriter.java | 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++--- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +++ .../tasks/TestSubtaskCheckpointCoordinator.java| 2 +- 7 files changed, 35 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 02a3a69..2112444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable { * Aborts the checkpoint and fails pending result for this checkpoint. * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ - void abort(long checkpointId, Throwable cause); + void abort(long checkpointId, Throwable cause, boolean cleanup); /** * Must be called after {@link #start(long, CheckpointOptions)} once. @@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 8996b3b..3e18050 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId); enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started - results.remove(checkpointId); + if (cleanup) { + results.remove(checkpointId); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 0dae88e..d0193dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest { } private void callAbort(ChannelStateWriter writer) { - writer.abort(CHECKPOINT_ID, new TestException()); + writer.abort(CHECKPOINT_ID, new TestException(), false); } private void callFinish(ChannelStateWriter writer) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java index 88bd334..7641d36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java +++
[flink] 01/08: [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 4030b1b30a2208e494d639af8767c3a944d87d58 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:34:30 2020 +0200 [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages --- .../flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index e6aa9dc..fc8655c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -102,11 +102,11 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions); ChannelStateWriteResult result = new ChannelStateWriteResult(); ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> { - Preconditions.checkState(results.size() < maxCheckpoints, "results.size() > maxCheckpoints", results.size(), maxCheckpoints); + Preconditions.checkState(results.size() < maxCheckpoints, String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), maxCheckpoints)); enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false); return result; }); - Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId); + Preconditions.checkArgument(put == result, taskName + " result future already present for checkpoint " + checkpointId); } @Override @@ -156,7 +156,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { public ChannelStateWriteResult getWriteResult(long checkpointId) { LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId); ChannelStateWriteResult result = results.get(checkpointId); - Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId); + Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId); return result; }
[flink] 03/08: [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:12:04 2020 +0200 [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058 which introduced a race condition when task thread and netty thread compete for ChannelStateWriteResult. Instead, next commits fix it by: 1. Map size validation error will be prevented simply by increasing the limit 2. When a checkpoint is subsumed, it's write result will be removed from on future completion --- .../checkpoint/channel/ChannelStateWriter.java| 4 +++- .../checkpoint/channel/ChannelStateWriterImpl.java| 1 - .../channel/ChannelStateWriterImplTest.java | 19 --- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index af2a708..02a3a69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable { @Override public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { - return ChannelStateWriteResult.EMPTY; + return new ChannelStateWriteResult( + CompletableFuture.completedFuture(Collections.emptyList()), + CompletableFuture.completedFuture(Collections.emptyList())); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 6158358..8996b3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { @Override public void start(long checkpointId, CheckpointOptions checkpointOptions) { - results.keySet().forEach(oldCheckpointId -> abort(oldCheckpointId, new Exception("Starting new checkpoint " + checkpointId))); LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions); ChannelStateWriteResult result = new ChannelStateWriteResult(); ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 9d7a7ea..0dae88e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest { unwrappingError(TestException.class, () -> callStart(writer)); } - @Test - public void testStartAbortsOldCheckpoints() throws Exception { - int maxCheckpoints = 10; - runWithSyncWorker((writer, worker) -> { - writer.start(0, CheckpointOptions.forCheckpointWithDefaultLocation()); - ChannelStateWriteResult writeResult = writer.getWriteResult(0); - for (int i = 1; i <= maxCheckpoints; i++) { + @Test(expected = IllegalStateException.class) + public void testLimit() throws IOException { + int maxCheckpoints = 3; + try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - worker.processAllRequests(); - assertTrue(writeResult.isDone()); - writeResult = writer.getWriteResult(i);
[flink] 06/08: [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e Author: Roman Khachatryan AuthorDate: Wed Jun 3 21:43:56 2020 +0200 [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator Check (by task thread) whether the current checkpoint was already aborted in the following scenario: 1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start checkpointing (netty thread) 2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread) 3. task thread processes a mail to start checkpointing --- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 22 +++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index cf8a21e..5d7a2c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; @@ -57,10 +56,12 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @Override public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, OperatorChain operatorChain) throws IOException { LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, taskName); + lastCheckpointId = Math.max(lastCheckpointId, checkpointId); + Iterator iterator = abortedCheckpointIds.iterator(); + while (iterator.hasNext()) { + long next = iterator.next(); + if (next < lastCheckpointId) { + iterator.remove(); + } else { + break; + } + } checkpointStorage.clearCacheFor(checkpointId); @@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream // checkpoint alignments + if (lastCheckpointId >= metadata.getCheckpointId()) { + LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); + channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); + checkAndClearAbortedStatus(metadata.getCheckpointId()); + return; + } + // Step (0): Record the last triggered checkpointId. - Preconditions.checkArgument(lastCheckpointId < metadata.getCheckpointId(), String.format( - "Unexpected current checkpoint-id: %s vs last checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId)); lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
[flink] 07/08: [FLINK-17869][tests] Unignore UnalignedCheckpointITCase
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 0634a2efd2f0ca42d4cb6762015be650c04a3136 Author: Roman Khachatryan AuthorDate: Wed Jun 3 22:12:15 2020 +0200 [FLINK-17869][tests] Unignore UnalignedCheckpointITCase --- .../org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 47e7007..6211077 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -44,7 +44,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.TestLogger; import org.apache.commons.lang3.ArrayUtils; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; @@ -102,7 +101,6 @@ import static org.hamcrest.Matchers.greaterThan; * The number of successful checkpoints is indeed {@code >=n}. * */ -@Ignore public class UnalignedCheckpointITCase extends TestLogger { public static final String NUM_INPUTS = "inputs"; public static final String NUM_OUTPUTS = "outputs";
[flink] branch release-1.11 updated (31a17cb -> 1d75ec8)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 31a17cb [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap new 4030b1b [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages new 36609bb [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable new 6cb8f28 [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." new d806924 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed new e14958d [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS new 7bb3ffa [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator new 0634a2e [FLINK-17869][tests] Unignore UnalignedCheckpointITCase new 1d75ec8 [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification The 8 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../checkpoint/channel/ChannelStateWriter.java | 29 +--- .../checkpoint/channel/ChannelStateWriterImpl.java | 25 -- .../channel/ChannelStateWriterImplTest.java| 49 +--- .../checkpoint/channel/MockChannelStateWriter.java | 8 +--- .../channel/RecordingChannelStateWriter.java | 5 -- .../runtime/state/ChannelPersistenceITCase.java| 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++-- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 --- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +- .../runtime/tasks/LocalStateForwardingTest.java| 2 - .../tasks/TestSubtaskCheckpointCoordinator.java| 2 +- .../checkpointing/UnalignedCheckpointITCase.java | 2 - 12 files changed, 95 insertions(+), 120 deletions(-)
[flink] 05/08: [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit e14958dcd09f7c32ba2ba25fc6a48696fe7eeca2 Author: Roman Khachatryan AuthorDate: Wed Jun 3 15:56:47 2020 +0200 [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS ChannelStateWriter map is cleaned up by the task thread, so the check in netty thread should take possible delay into account. --- .../apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 3e18050..89a5247 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -56,7 +56,7 @@ import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque public class ChannelStateWriterImpl implements ChannelStateWriter { private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class); - private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently, only single in-flight checkpoint is supported + private static final int DEFAULT_MAX_CHECKPOINTS = 1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox) private final String taskName; private final ChannelStateWriteRequestExecutor executor;
[flink] 08/08: [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d75ec8efe3c31f2a5e1a4572b5cfb8000ddbf67 Author: Roman Khachatryan AuthorDate: Thu Jun 4 10:06:15 2020 +0200 [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification --- .../flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 5d7a2c9..d904c87 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -306,6 +306,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } } + channelStateWriter.abort(checkpointId, new CancellationException("checkpoint aborted via notification"), false); + for (StreamOperatorWrapper operatorWrapper : operatorChain.getAllOperators(true)) { try { operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);
[flink] 04/08: [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit d8069249703bbe7858e0c6a044deb54ce75e3989 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:25:01 2020 +0200 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed Motivation: stop writing channel state ASAP if the checkpoint is subsumed Changes: 1. complete CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture 2. abort channel state write on its erroneous completion 3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false in the call above --- .../checkpoint/channel/ChannelStateWriter.java | 4 +-- .../checkpoint/channel/ChannelStateWriterImpl.java | 6 +++-- .../channel/ChannelStateWriterImplTest.java| 2 +- .../checkpoint/channel/MockChannelStateWriter.java | 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++--- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +++ .../tasks/TestSubtaskCheckpointCoordinator.java| 2 +- 7 files changed, 35 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 02a3a69..2112444 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable { * Aborts the checkpoint and fails pending result for this checkpoint. * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ - void abort(long checkpointId, Throwable cause); + void abort(long checkpointId, Throwable cause, boolean cleanup); /** * Must be called after {@link #start(long, CheckpointOptions)} once. @@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 8996b3b..3e18050 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public void abort(long checkpointId, Throwable cause) { + public void abort(long checkpointId, Throwable cause, boolean cleanup) { LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId); enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started - results.remove(checkpointId); + if (cleanup) { + results.remove(checkpointId); + } } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 0dae88e..d0193dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest { } private void callAbort(ChannelStateWriter writer) { - writer.abort(CHECKPOINT_ID, new TestException()); + writer.abort(CHECKPOINT_ID, new TestException(), false); } private void callFinish(ChannelStateWriter writer) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java index 88bd334..7641d36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java +++
[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 36609bb5dc2aee4061a47f7a767630f1f5912d96 Author: Roman Khachatryan AuthorDate: Wed Jun 3 23:03:52 2020 +0200 [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable OperatorSnapshotFinalizer already waits and holds this future. ChannelStateWriter.getWriteResult() can then be non-idempotent. ChannelStateWriter.stop() can then be removed. --- .../checkpoint/channel/ChannelStateWriter.java | 21 ++-- .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++-- .../channel/ChannelStateWriterImplTest.java| 28 ++ .../checkpoint/channel/MockChannelStateWriter.java | 6 + .../channel/RecordingChannelStateWriter.java | 5 .../runtime/state/ChannelPersistenceITCase.java| 2 +- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 - .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +--- .../runtime/tasks/LocalStateForwardingTest.java| 2 -- 9 files changed, 24 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 5dad559..af2a708 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained -* using {@link #getWriteResult} +* using {@link #getAndRemoveWriteResult} */ void finishInput(long checkpointId); @@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained -* using {@link #getWriteResult} +* using {@link #getAndRemoveWriteResult} */ void finishOutput(long checkpointId); /** * Aborts the checkpoint and fails pending result for this checkpoint. +* @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ void abort(long checkpointId, Throwable cause); /** -* Must be called after {@link #start(long, CheckpointOptions)}. +* Must be called after {@link #start(long, CheckpointOptions)} once. +* @throws IllegalArgumentException if the passed checkpointId is not known. */ - ChannelStateWriteResult getWriteResult(long checkpointId); - - /** -* Cleans up the internal state for the given checkpoint. -*/ - void stop(long checkpointId); + ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException; ChannelStateWriter NO_OP = new NoOpChannelStateWriter(); @@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { return ChannelStateWriteResult.EMPTY; } @Override public void close() { } - - @Override - public void stop(long checkpointId) { - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index fc8655c..6158358 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult
[flink] 01/08: [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 4030b1b30a2208e494d639af8767c3a944d87d58 Author: Roman Khachatryan AuthorDate: Wed Jun 3 14:34:30 2020 +0200 [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages --- .../flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index e6aa9dc..fc8655c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -102,11 +102,11 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions); ChannelStateWriteResult result = new ChannelStateWriteResult(); ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> { - Preconditions.checkState(results.size() < maxCheckpoints, "results.size() > maxCheckpoints", results.size(), maxCheckpoints); + Preconditions.checkState(results.size() < maxCheckpoints, String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), maxCheckpoints)); enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false); return result; }); - Preconditions.checkArgument(put == result, "result future already present for checkpoint " + checkpointId); + Preconditions.checkArgument(put == result, taskName + " result future already present for checkpoint " + checkpointId); } @Override @@ -156,7 +156,7 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { public ChannelStateWriteResult getWriteResult(long checkpointId) { LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId); ChannelStateWriteResult result = results.get(checkpointId); - Preconditions.checkArgument(result != null, "channel state write result not found for checkpoint " + checkpointId); + Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId); return result; }
[flink] branch release-1.11 updated: [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 31a17cb [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap 31a17cb is described below commit 31a17cb523603a0b0d2daaf8ab7c1a18f9fc7999 Author: Andrey Zagrebin AuthorDate: Tue Jun 9 16:47:04 2020 +0300 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap This closes #12557. --- docs/ops/memory/mem_setup_master.md| 5 ++ docs/ops/memory/mem_setup_master.zh.md | 4 ++ .../jobmanager/JobManagerFlinkMemoryUtils.java | 56 ++ .../jobmanager/JobManagerProcessUtilsTest.java | 38 ++- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md index dbea142..d086427 100644 --- a/docs/ops/memory/mem_setup_master.md +++ b/docs/ops/memory/mem_setup_master.md @@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +Note If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. + ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md index dbea142..54b5165 100644 --- a/docs/ops/memory/mem_setup_master.zh.md +++ b/docs/ops/memory/mem_setup_master.zh.md @@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +Note If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java index b22b6e6..9f4f13e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java @@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils
[flink] branch master updated: [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4555ad9 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap 4555ad9 is described below commit 4555ad91b4bd0df8887c4b4eb3119cbae272e805 Author: Andrey Zagrebin AuthorDate: Tue Jun 9 16:47:04 2020 +0300 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap --- docs/ops/memory/mem_setup_master.md| 5 ++ docs/ops/memory/mem_setup_master.zh.md | 4 ++ .../jobmanager/JobManagerFlinkMemoryUtils.java | 56 ++ .../jobmanager/JobManagerProcessUtilsTest.java | 38 ++- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md index dbea142..d086427 100644 --- a/docs/ops/memory/mem_setup_master.md +++ b/docs/ops/memory/mem_setup_master.md @@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +Note If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. + ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md index dbea142..54b5165 100644 --- a/docs/ops/memory/mem_setup_master.zh.md +++ b/docs/ops/memory/mem_setup_master.zh.md @@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +Note If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java index b22b6e6..9f4f13e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java @@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils
[flink] 01/08: [FLINK-17980][docs] Move project setup into DataStream section
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 0861d2e586e5202c992b8a15ec23ca71d56dea62 Author: Seth Wiesman AuthorDate: Mon Jun 8 11:22:15 2020 -0500 [FLINK-17980][docs] Move project setup into DataStream section --- docs/dev/project-configuration.md | 559 + docs/dev/project-configuration.zh.md | 559 + docs/getting-started/project-setup/dependencies.md | 237 - .../project-setup/dependencies.zh.md | 200 .../project-setup/java_api_quickstart.md | 375 -- .../project-setup/java_api_quickstart.zh.md| 360 - .../project-setup/scala_api_quickstart.md | 249 - .../project-setup/scala_api_quickstart.zh.md | 241 - docs/redirects/dependencies.md | 2 +- ...ndencies.md => getting-started-dependencies.md} | 6 +- .../index.md => redirects/java-quickstart.md} | 9 +- .../index.zh.md => redirects/scala-quickstart.md} | 9 +- docs/redirects/scala_quickstart.md | 2 +- 13 files changed, 1131 insertions(+), 1677 deletions(-) diff --git a/docs/dev/project-configuration.md b/docs/dev/project-configuration.md new file mode 100644 index 000..a23b134 --- /dev/null +++ b/docs/dev/project-configuration.md @@ -0,0 +1,559 @@ +--- +title: "Project Configuration" +nav-parent_id: streaming +nav-pos: 301 +--- + + +Every Flink application depends on a set of Flink libraries. At the bare minimum, the application depends +on the Flink APIs. Many applications depend in addition on certain connector libraries (like Kafka, Cassandra, etc.). +When running Flink applications (either in a distributed deployment, or in the IDE for testing), the Flink +runtime library must be available as well. + +* This will be replaced by the TOC +{:toc} + +## Flink Core and Application Dependencies + +As with most systems that run user-defined applications, there are two broad categories of dependencies and libraries in Flink: + + - **Flink Core Dependencies**: Flink itself consists of a set of classes and dependencies that are needed to run the system, for example +coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management, etc. +The set of all these classes and dependencies forms the core of Flink's runtime and must be present when a Flink +application is started. + +These core classes and dependencies are packaged in the `flink-dist` jar. They are part of Flink's `lib` folder and +part of the basic Flink container images. Think of these dependencies as similar to Java's core library (`rt.jar`, `charsets.jar`, etc.), +which contains the classes like `String` and `List`. + +The Flink Core Dependencies do not contain any connectors or libraries (CEP, SQL, ML, etc.) in order to avoid having an excessive +number of dependencies and classes in the classpath by default. In fact, we try to keep the core dependencies as slim as possible +to keep the default classpath small and avoid dependency clashes. + + - The **User Application Dependencies** are all connectors, formats, or libraries that a specific user application needs. + +The user application is typically packaged into an *application jar*, which contains the application code and the required +connector and library dependencies. + +The user application dependencies explicitly do not include the Flink DataStream APIs and runtime dependencies, +because those are already part of Flink's Core Dependencies. + + +## Setting up a Project: Basic Dependencies + +Every Flink application needs as the bare minimum the API dependencies, to develop against. + +When setting up a project manually, you need to add the following dependencies for the Java/Scala API +(here presented in Maven syntax, but the same dependencies apply to other build tools (Gradle, SBT, etc.) as well. + + + +{% highlight xml %} + + org.apache.flink + flink-streaming-java{{ site.scala_version_suffix }} + {{site.version }} + provided + +{% endhighlight %} + + +{% highlight xml %} + + org.apache.flink + flink-streaming-scala{{ site.scala_version_suffix }} + {{site.version }} + provided + +{% endhighlight %} + + + +**Important:** Please note that all these dependencies have their scope set to *provided*. +That means that they are needed to compile against, but that they should not be packaged into the +project's resulting application jar file - these dependencies are Flink Core Dependencies, +which are already available in any setup. + +It is highly recommended keeping the dependencies in scope *provided*. If they are not set to *provided*, +the best case is that the resulting JAR becomes excessively large, because it also contains all Flink core
[flink] branch release-1.11 updated (7dfd517 -> f4ada58)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 7dfd517 [FLINK-18084][docs] Document the Application Mode new 0861d2e [FLINK-17980][docs] Move project setup into DataStream section new b8e8269 [FLINK-17980][docs] Move getting started walkthroughs to Try Flink new 1208384 [FLINK-17980][docs] Rename Hands-on Training to Learn Flink new cdb4f48 [FLINK-17980][docs] Update headings of datastream and table walkthroughs new 69456c3 [FLINK-17980][docs] Update broken links new f14cb74 [FLINK-17980][docs] Add training redirects new 38e5144 [FLINK-17980][docs] Update broken links new f4ada58 [FLINK-17980][docs] Add training redirects The 8 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/concepts/index.md | 13 +- docs/concepts/index.zh.md | 9 +- docs/dev/connectors/elasticsearch.md | 2 +- docs/dev/connectors/elasticsearch.zh.md| 2 +- docs/dev/project-configuration.md | 559 + docs/dev/project-configuration.zh.md | 559 + docs/dev/stream/state/checkpointing.md | 4 +- docs/dev/stream/state/checkpointing.zh.md | 4 +- docs/getting-started/index.md | 86 docs/getting-started/index.zh.md | 86 docs/getting-started/project-setup/dependencies.md | 237 - .../project-setup/dependencies.zh.md | 200 .../project-setup/java_api_quickstart.md | 375 -- .../project-setup/java_api_quickstart.zh.md| 360 - .../project-setup/scala_api_quickstart.md | 249 - .../project-setup/scala_api_quickstart.zh.md | 241 - docs/getting-started/walkthroughs/index.md | 25 - docs/getting-started/walkthroughs/index.zh.md | 25 - docs/index.md | 6 +- docs/index.zh.md | 6 +- docs/internals/task_lifecycle.md | 2 +- docs/internals/task_lifecycle.zh.md| 2 +- docs/{training => learn-flink}/datastream_api.md | 2 +- .../{training => learn-flink}/datastream_api.zh.md | 2 +- docs/{training => learn-flink}/etl.md | 2 +- docs/{training => learn-flink}/etl.zh.md | 2 +- docs/{training => learn-flink}/event_driven.md | 8 +- docs/{training => learn-flink}/event_driven.zh.md | 8 +- docs/{training => learn-flink}/fault_tolerance.md | 6 +- .../fault_tolerance.zh.md | 6 +- docs/{training => learn-flink}/index.md| 6 +- docs/{training => learn-flink}/index.zh.md | 2 +- .../streaming_analytics.md | 8 +- .../streaming_analytics.zh.md | 8 +- docs/ops/state/savepoints.md | 2 +- docs/ops/state/savepoints.zh.md| 2 +- .../{dependencies.md => datastream-walkthrough.md} | 6 +- docs/redirects/dependencies.md | 2 +- ...endencies.md => flink-operations-playground.md} | 6 +- ...ndencies.md => getting-started-dependencies.md} | 6 +- .../index.md => redirects/getting-started.md} | 9 +- .../{dependencies.md => java-quickstart.md}| 6 +- ...dependencies.md => python_table_walkthrough.md} | 6 +- .../{dependencies.md => scala-quickstart.md} | 6 +- docs/redirects/scala_quickstart.md | 2 +- .../index.md => redirects/table-walkthrough.md}| 9 +- .../index.zh.md => redirects/training.md} | 9 +- .../walkthroughs => try-flink}/datastream_api.md | 7 +- .../datastream_api.zh.md | 7 +- .../flink-operations-playground.md | 8 +- .../flink-operations-playground.zh.md | 8 +- .../project-setup => try-flink}/index.md | 12 +- .../docker-playgrounds => try-flink}/index.zh.md | 11 +- .../walkthroughs => try-flink}/python_table_api.md | 4 +- .../python_table_api.zh.md | 4 +- .../walkthroughs => try-flink}/table_api.md| 7 +- .../walkthroughs => try-flink}/table_api.zh.md | 7 +- 57 files changed, 1243 insertions(+), 2015 deletions(-) create mode 100644 docs/dev/project-configuration.md create mode 100644 docs/dev/project-configuration.zh.md delete mode 100644 docs/getting-started/index.md delete mode 100644 docs/getting-started/index.zh.md delete mode 100644 docs/getting-started/project-setup/dependencies.md delete mode
[flink] 02/08: [FLINK-17980][docs] Move getting started walkthroughs to Try Flink
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit b8e8269871950cf4437b44f2186c814675332d35 Author: Seth Wiesman AuthorDate: Mon Jun 8 11:33:16 2020 -0500 [FLINK-17980][docs] Move getting started walkthroughs to Try Flink --- docs/getting-started/index.md | 86 -- docs/getting-started/index.zh.md | 86 -- .../datastream-walkthrough.md} | 9 +-- .../flink-operations-playground.md}| 9 +-- .../index.md => redirects/getting-started.md} | 11 ++- .../python_table_walkthrough.md} | 11 ++- .../index.md => redirects/table-walkthrough.md}| 9 +-- .../walkthroughs => try-flink}/datastream_api.md | 2 +- .../datastream_api.zh.md | 2 +- .../flink-operations-playground.md | 4 +- .../flink-operations-playground.zh.md | 4 +- .../docker-playgrounds => try-flink}/index.md | 12 +-- .../docker-playgrounds => try-flink}/index.zh.md | 11 +-- .../walkthroughs => try-flink}/python_table_api.md | 4 +- .../python_table_api.zh.md | 4 +- .../walkthroughs => try-flink}/table_api.md| 2 +- .../walkthroughs => try-flink}/table_api.zh.md | 2 +- 17 files changed, 47 insertions(+), 221 deletions(-) diff --git a/docs/getting-started/index.md b/docs/getting-started/index.md deleted file mode 100644 index a6b5cab..000 --- a/docs/getting-started/index.md +++ /dev/null @@ -1,86 +0,0 @@ -title: "Getting Started" -nav-id: getting-started -nav-title: ' Getting Started' -nav-parent_id: root -section-break: true -nav-show_overview: true -nav-pos: 1 - - -There are many ways to get started with Apache Flink. Which one is the best for -you depends on your goals and prior experience: - -* take a look at the **Docker Playgrounds** if you want to see what Flink can do, via a hands-on, - docker-based introduction to specific Flink concepts -* explore one of the **Code Walkthroughs** if you want a quick, end-to-end - introduction to one of Flink's APIs -* work your way through the **Hands-on Training** for a comprehensive, - step-by-step introduction to Flink -* use **Project Setup** if you already know the basics of Flink and want a - project template for Java or Scala, or need help setting up the dependencies - -### Taking a first look at Flink - -The **Docker Playgrounds** provide sandboxed Flink environments that are set up in just a few minutes and which allow you to explore and play with Flink. - -* The [**Operations Playground**]({% link getting-started/docker-playgrounds/flink-operations-playground.md %}) shows you how to operate streaming applications with Flink. You can experience how Flink recovers application from failures, upgrade and scale streaming applications up and down, and query application metrics. - - - -### First steps with one of Flink's APIs - -The **Code Walkthroughs** are a great way to get started quickly with a step-by-step introduction to -one of Flink's APIs. Each walkthrough provides instructions for bootstrapping a small skeleton -project, and then shows how to extend it to a simple application. - -* The [**DataStream API** code walkthrough]({% link getting-started/walkthroughs/datastream_api.md %}) shows how - to implement a simple DataStream application and how to extend it to be stateful and use timers. - The DataStream API is Flink's main abstraction for implementing stateful streaming applications - with sophisticated time semantics in Java or Scala. - -* Flink's **Table API** is a relational API used for writing SQL-like queries in Java, Scala, or - Python, which are then automatically optimized, and can be executed on batch or streaming data - with identical syntax and semantics. The [Table API code walkthrough for Java and Scala]({% link - getting-started/walkthroughs/table_api.md %}) shows how to implement a simple Table API query on a - batch source and how to evolve it into a continuous query on a streaming source. There's also a - similar [code walkthrough for the Python Table API]({% link - getting-started/walkthroughs/python_table_api.md %}). - -### Taking a Deep Dive with the Hands-on Training - -The [**Hands-on Training**]({% link training/index.md %}) is a self-paced training course with -a set of lessons and hands-on exercises. This step-by-step introduction to Flink focuses -on learning how to use the DataStream API to meet the needs of common, real-world use cases, -and provides a complete introduction to the fundamental concepts: parallel dataflows, -stateful stream processing, event time and watermarking, and fault tolerance via state snapshots. - - diff --git a/docs/getting-started/index.zh.md b/docs/getting-started/index.zh.md deleted file mode 100644
[flink] 07/08: [FLINK-17980][docs] Update broken links
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 38e5144268dc433ac682df94fc1276f19fef6b84 Author: Seth Wiesman AuthorDate: Mon Jun 8 14:58:25 2020 -0500 [FLINK-17980][docs] Update broken links --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 69eb302..4f9ad2b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,7 +34,7 @@ Apache Flink is an open source platform for distributed stream and batch data pr * [Write a Table API query]({% link try-flink/table_api.md %}) * **Docker Playgrounds**: Set up a sandboxed Flink environment in just a few minutes to explore and play with Flink. - * [Run and manage Flink streaming applications]({% try-flink/flink-operations-playground.md %}) + * [Run and manage Flink streaming applications]({% link try-flink/flink-operations-playground.md %}) * **Concepts**: Learn about Flink's concepts to better understand the documentation. * [Stateful Stream Processing](concepts/stateful-stream-processing.html)
[flink] 03/08: [FLINK-17980][docs] Rename Hands-on Training to Learn Flink
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 12083841bcde77e4f04a169e6cf51d5ca5583d9d Author: Seth Wiesman AuthorDate: Mon Jun 8 11:35:19 2020 -0500 [FLINK-17980][docs] Rename Hands-on Training to Learn Flink --- docs/{training => learn-flink}/datastream_api.md | 2 +- docs/{training => learn-flink}/datastream_api.zh.md | 2 +- docs/{training => learn-flink}/etl.md| 2 +- docs/{training => learn-flink}/etl.zh.md | 2 +- docs/{training => learn-flink}/event_driven.md | 8 docs/{training => learn-flink}/event_driven.zh.md| 8 docs/{training => learn-flink}/fault_tolerance.md| 6 +++--- docs/{training => learn-flink}/fault_tolerance.zh.md | 6 +++--- docs/{training => learn-flink}/index.md | 6 +++--- docs/{training => learn-flink}/index.zh.md | 0 docs/{training => learn-flink}/streaming_analytics.md| 8 docs/{training => learn-flink}/streaming_analytics.zh.md | 8 12 files changed, 29 insertions(+), 29 deletions(-) diff --git a/docs/training/datastream_api.md b/docs/learn-flink/datastream_api.md similarity index 99% rename from docs/training/datastream_api.md rename to docs/learn-flink/datastream_api.md index 7392041..fe6f548 100644 --- a/docs/training/datastream_api.md +++ b/docs/learn-flink/datastream_api.md @@ -3,7 +3,7 @@ title: Intro to the DataStream API nav-id: datastream-api nav-pos: 2 nav-title: Intro to the DataStream API -nav-parent_id: training +nav-parent_id: learn-flink ---
[flink] 08/08: [FLINK-17980][docs] Add training redirects
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit f4ada5888d31412c462fc98124742e7a4d099ce1 Author: Seth Wiesman AuthorDate: Mon Jun 8 15:13:17 2020 -0500 [FLINK-17980][docs] Add training redirects This closes #12534 --- docs/redirects/training.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/redirects/training.md b/docs/redirects/training.md index 7061c6f..09cc129 100644 --- a/docs/redirects/training.md +++ b/docs/redirects/training.md @@ -1,7 +1,7 @@ --- title: Hands-on Training layout: redirect -redirect: /learn-flink/html +redirect: /learn-flink/index.html permalink: /training/index.html ---
[flink] 04/08: [FLINK-17980][docs] Update headings of datastream and table walkthroughs
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit cdb4f486564d01b3b8372ccb0f85082d542aa20c Author: Seth Wiesman AuthorDate: Mon Jun 8 11:54:06 2020 -0500 [FLINK-17980][docs] Update headings of datastream and table walkthroughs --- docs/try-flink/datastream_api.md| 5 ++--- docs/try-flink/datastream_api.zh.md | 5 ++--- docs/try-flink/table_api.md | 5 ++--- docs/try-flink/table_api.zh.md | 5 ++--- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/docs/try-flink/datastream_api.md b/docs/try-flink/datastream_api.md index ae529ad..33ed8ab 100644 --- a/docs/try-flink/datastream_api.md +++ b/docs/try-flink/datastream_api.md @@ -1,7 +1,6 @@ --- -title: "DataStream API" -nav-id: datastreamwalkthrough -nav-title: 'DataStream API' +title: "Fraud Detection with the DataStream API" +nav-title: 'Fraud Detection with the DataStream API' nav-parent_id: try-flink nav-pos: 1 --- diff --git a/docs/try-flink/datastream_api.zh.md b/docs/try-flink/datastream_api.zh.md index d5e13f4..e28cc5d 100644 --- a/docs/try-flink/datastream_api.zh.md +++ b/docs/try-flink/datastream_api.zh.md @@ -1,7 +1,6 @@ --- -title: "DataStream API" -nav-id: datastreamwalkthrough -nav-title: 'DataStream API' +title: "Fraud Detection with the DataStream API" +nav-title: 'Fraud Detection with the DataStream API' nav-parent_id: try-flink nav-pos: 1 --- diff --git a/docs/try-flink/table_api.md b/docs/try-flink/table_api.md index 7c277ca..aa78f67 100644 --- a/docs/try-flink/table_api.md +++ b/docs/try-flink/table_api.md @@ -1,7 +1,6 @@ --- -title: "Table API" -nav-id: tableapiwalkthrough -nav-title: 'Table API' +title: "Real Time Reporting with the Table API" +nav-title: 'Real Time Reporting with the Table API' nav-parent_id: try-flink nav-pos: 2 --- diff --git a/docs/try-flink/table_api.zh.md b/docs/try-flink/table_api.zh.md index 13f4bee..530ec1a 100644 --- a/docs/try-flink/table_api.zh.md +++ b/docs/try-flink/table_api.zh.md @@ -1,7 +1,6 @@ --- -title: "Table API" -nav-id: tableapiwalkthrough -nav-title: 'Table API' +title: "Real Time Reporting with the Table API" +nav-title: 'Real Time Reporting with the Table API' nav-parent_id: try-flink nav-pos: 2 ---
[flink] 05/08: [FLINK-17980][docs] Update broken links
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 69456c3e05d665dd2c2e1e1949c17908b3c5e74e Author: Seth Wiesman AuthorDate: Mon Jun 8 14:03:11 2020 -0500 [FLINK-17980][docs] Update broken links --- docs/concepts/index.md | 13 ++--- docs/concepts/index.zh.md| 9 - docs/dev/connectors/elasticsearch.md | 2 +- docs/dev/connectors/elasticsearch.zh.md | 2 +- docs/dev/stream/state/checkpointing.md | 4 ++-- docs/dev/stream/state/checkpointing.zh.md| 4 ++-- docs/index.md| 6 +++--- docs/index.zh.md | 6 +++--- docs/internals/task_lifecycle.md | 2 +- docs/internals/task_lifecycle.zh.md | 2 +- docs/learn-flink/index.md| 2 +- docs/learn-flink/index.zh.md | 2 +- docs/ops/state/savepoints.md | 2 +- docs/ops/state/savepoints.zh.md | 2 +- docs/try-flink/flink-operations-playground.md| 4 ++-- docs/try-flink/flink-operations-playground.zh.md | 4 ++-- 16 files changed, 32 insertions(+), 34 deletions(-) diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 2c44e1f..11beb8e 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -1,8 +1,8 @@ --- -title: Concepts in Depth +title: Concepts nav-id: concepts nav-pos: 3 -nav-title: ' Concepts in Depth' +nav-title: ' Concepts' nav-parent_id: root nav-show_overview: true permalink: /concepts/index.html @@ -26,13 +26,12 @@ specific language governing permissions and limitations under the License. --> -The [Hands-on Training]({% link training/index.md %}) explains the basic concepts +The [Hands-on Training]({% link learn-flink/index.md %}) explains the basic concepts of stateful and timely stream processing that underlie Flink's APIs, and provides examples of how these mechanisms are used in applications. Stateful stream processing is introduced in the context -of [Data Pipelines & ETL]({% link training/etl.md %}#stateful-transformations) -and is further developed in the section on [Fault Tolerance]({% link -training/fault_tolerance.md %}). Timely stream processing is introduced in the section on -[Streaming Analytics]({% link training/streaming_analytics.md %}). +of [Data Pipelines & ETL]({% link learn-flink/etl.md %}#stateful-transformations) +and is further developed in the section on [Fault Tolerance]({% link learn-flink/fault_tolerance.md %}). Timely stream processing is introduced in the section on +[Streaming Analytics]({% link learn-flink/streaming_analytics.md %}). This _Concepts in Depth_ section provides a deeper understanding of how Flink's architecture and runtime implement these concepts. diff --git a/docs/concepts/index.zh.md b/docs/concepts/index.zh.md index fab83f7..54f7dfb 100644 --- a/docs/concepts/index.zh.md +++ b/docs/concepts/index.zh.md @@ -26,13 +26,12 @@ specific language governing permissions and limitations under the License. --> -The [Hands-on Training]({% link training/index.zh.md %}) explains the basic concepts +The [Hands-on Training]({% link learn-flink/index.zh.md %}) explains the basic concepts of stateful and timely stream processing that underlie Flink's APIs, and provides examples of how these mechanisms are used in applications. Stateful stream processing is introduced in the context -of [Data Pipelines & ETL]({% link training/etl.zh.md %}#stateful-transformations) -and is further developed in the section on [Fault Tolerance]({% link -training/fault_tolerance.zh.md %}). Timely stream processing is introduced in the section on -[Streaming Analytics]({% link training/streaming_analytics.zh.md %}). +of [Data Pipelines & ETL]({% link learn-flink/etl.zh.md %}#stateful-transformations) +and is further developed in the section on [Fault Tolerance]({% link learn-flink/fault_tolerance.zh.md %}). Timely stream processing is introduced in the section on +[Streaming Analytics]({% link learn-flink/streaming_analytics.zh.md %}). This _Concepts in Depth_ section provides a deeper understanding of how Flink's architecture and runtime implement these concepts. diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 4b8b2da..9fd808e 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -317,7 +317,7 @@ time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink. -More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/training/fault_tolerance.html). +More
[flink] 06/08: [FLINK-17980][docs] Add training redirects
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit f14cb74dd11055b9f6f90a5841502e43093c2dd9 Author: Seth Wiesman AuthorDate: Mon Jun 8 14:04:20 2020 -0500 [FLINK-17980][docs] Add training redirects --- docs/redirects/training.md | 24 1 file changed, 24 insertions(+) diff --git a/docs/redirects/training.md b/docs/redirects/training.md new file mode 100644 index 000..7061c6f --- /dev/null +++ b/docs/redirects/training.md @@ -0,0 +1,24 @@ +--- +title: Hands-on Training +layout: redirect +redirect: /learn-flink/html +permalink: /training/index.html +--- +
[flink] branch master updated (55712a5 -> 6e4e7ff)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 55712a5 [FLINK-17980][docs] Add training redirects add 6e4e7ff [FLINK-17980][docs] Move project setup into DataStream section No new revisions were added by this update. Summary of changes: docs/dev/project-configuration.md | 559 + docs/dev/project-configuration.zh.md | 559 + docs/getting-started/project-setup/dependencies.md | 237 - .../project-setup/dependencies.zh.md | 200 docs/getting-started/project-setup/index.md| 25 - docs/getting-started/project-setup/index.zh.md | 25 - .../project-setup/java_api_quickstart.md | 375 -- .../project-setup/java_api_quickstart.zh.md| 360 - .../project-setup/scala_api_quickstart.md | 249 - .../project-setup/scala_api_quickstart.zh.md | 241 - docs/redirects/dependencies.md | 2 +- .../{als.md => getting-started-dependencies.md}| 6 +- docs/redirects/{als.md => java-quickstart.md} | 6 +- docs/redirects/{als.md => scala-quickstart.md} | 6 +- docs/redirects/scala_quickstart.md | 2 +- 15 files changed, 1129 insertions(+), 1723 deletions(-) create mode 100644 docs/dev/project-configuration.md create mode 100644 docs/dev/project-configuration.zh.md delete mode 100644 docs/getting-started/project-setup/dependencies.md delete mode 100644 docs/getting-started/project-setup/dependencies.zh.md delete mode 100644 docs/getting-started/project-setup/index.md delete mode 100644 docs/getting-started/project-setup/index.zh.md delete mode 100644 docs/getting-started/project-setup/java_api_quickstart.md delete mode 100644 docs/getting-started/project-setup/java_api_quickstart.zh.md delete mode 100644 docs/getting-started/project-setup/scala_api_quickstart.md delete mode 100644 docs/getting-started/project-setup/scala_api_quickstart.zh.md copy docs/redirects/{als.md => getting-started-dependencies.md} (83%) copy docs/redirects/{als.md => java-quickstart.md} (85%) copy docs/redirects/{als.md => scala-quickstart.md} (85%)
[flink] 03/07: [FLINK-17980][docs] Update headings of datastream and table walkthroughs
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 49b2e4fb210b4ed4611cb27abf48d2f5625992c2 Author: Seth Wiesman AuthorDate: Mon Jun 8 11:54:06 2020 -0500 [FLINK-17980][docs] Update headings of datastream and table walkthroughs --- docs/try-flink/datastream_api.md| 5 ++--- docs/try-flink/datastream_api.zh.md | 5 ++--- docs/try-flink/table_api.md | 5 ++--- docs/try-flink/table_api.zh.md | 5 ++--- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/docs/try-flink/datastream_api.md b/docs/try-flink/datastream_api.md index ae529ad..33ed8ab 100644 --- a/docs/try-flink/datastream_api.md +++ b/docs/try-flink/datastream_api.md @@ -1,7 +1,6 @@ --- -title: "DataStream API" -nav-id: datastreamwalkthrough -nav-title: 'DataStream API' +title: "Fraud Detection with the DataStream API" +nav-title: 'Fraud Detection with the DataStream API' nav-parent_id: try-flink nav-pos: 1 --- diff --git a/docs/try-flink/datastream_api.zh.md b/docs/try-flink/datastream_api.zh.md index d5e13f4..e28cc5d 100644 --- a/docs/try-flink/datastream_api.zh.md +++ b/docs/try-flink/datastream_api.zh.md @@ -1,7 +1,6 @@ --- -title: "DataStream API" -nav-id: datastreamwalkthrough -nav-title: 'DataStream API' +title: "Fraud Detection with the DataStream API" +nav-title: 'Fraud Detection with the DataStream API' nav-parent_id: try-flink nav-pos: 1 --- diff --git a/docs/try-flink/table_api.md b/docs/try-flink/table_api.md index 7c277ca..aa78f67 100644 --- a/docs/try-flink/table_api.md +++ b/docs/try-flink/table_api.md @@ -1,7 +1,6 @@ --- -title: "Table API" -nav-id: tableapiwalkthrough -nav-title: 'Table API' +title: "Real Time Reporting with the Table API" +nav-title: 'Real Time Reporting with the Table API' nav-parent_id: try-flink nav-pos: 2 --- diff --git a/docs/try-flink/table_api.zh.md b/docs/try-flink/table_api.zh.md index 13f4bee..530ec1a 100644 --- a/docs/try-flink/table_api.zh.md +++ b/docs/try-flink/table_api.zh.md @@ -1,7 +1,6 @@ --- -title: "Table API" -nav-id: tableapiwalkthrough -nav-title: 'Table API' +title: "Real Time Reporting with the Table API" +nav-title: 'Real Time Reporting with the Table API' nav-parent_id: try-flink nav-pos: 2 ---
[flink] 05/07: [FLINK-17980][docs] Add training redirects
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f2db6acf64cf4c319f1629e8b4c0cb68b083ca4c Author: Seth Wiesman AuthorDate: Mon Jun 8 14:04:20 2020 -0500 [FLINK-17980][docs] Add training redirects --- docs/redirects/training.md | 24 1 file changed, 24 insertions(+) diff --git a/docs/redirects/training.md b/docs/redirects/training.md new file mode 100644 index 000..7061c6f --- /dev/null +++ b/docs/redirects/training.md @@ -0,0 +1,24 @@ +--- +title: Hands-on Training +layout: redirect +redirect: /learn-flink/html +permalink: /training/index.html +--- +
[flink] 06/07: [FLINK-17980][docs] Update broken links
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 420c77bd05d3dbd87b42f72e3c0f22923d77c768 Author: Seth Wiesman AuthorDate: Mon Jun 8 14:58:25 2020 -0500 [FLINK-17980][docs] Update broken links --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 69eb302..4f9ad2b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,7 +34,7 @@ Apache Flink is an open source platform for distributed stream and batch data pr * [Write a Table API query]({% link try-flink/table_api.md %}) * **Docker Playgrounds**: Set up a sandboxed Flink environment in just a few minutes to explore and play with Flink. - * [Run and manage Flink streaming applications]({% try-flink/flink-operations-playground.md %}) + * [Run and manage Flink streaming applications]({% link try-flink/flink-operations-playground.md %}) * **Concepts**: Learn about Flink's concepts to better understand the documentation. * [Stateful Stream Processing](concepts/stateful-stream-processing.html)
[flink] 02/07: [FLINK-17980][docs] Rename Hands-on Training to Learn Flink
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5df38371cf05efb87bc9645325036c80b26e3983 Author: Seth Wiesman AuthorDate: Mon Jun 8 11:35:19 2020 -0500 [FLINK-17980][docs] Rename Hands-on Training to Learn Flink --- docs/{training => learn-flink}/datastream_api.md | 2 +- docs/{training => learn-flink}/datastream_api.zh.md | 2 +- docs/{training => learn-flink}/etl.md| 2 +- docs/{training => learn-flink}/etl.zh.md | 2 +- docs/{training => learn-flink}/event_driven.md | 8 docs/{training => learn-flink}/event_driven.zh.md| 8 docs/{training => learn-flink}/fault_tolerance.md| 6 +++--- docs/{training => learn-flink}/fault_tolerance.zh.md | 6 +++--- docs/{training => learn-flink}/index.md | 6 +++--- docs/{training => learn-flink}/index.zh.md | 0 docs/{training => learn-flink}/streaming_analytics.md| 8 docs/{training => learn-flink}/streaming_analytics.zh.md | 8 12 files changed, 29 insertions(+), 29 deletions(-) diff --git a/docs/training/datastream_api.md b/docs/learn-flink/datastream_api.md similarity index 99% rename from docs/training/datastream_api.md rename to docs/learn-flink/datastream_api.md index 7392041..fe6f548 100644 --- a/docs/training/datastream_api.md +++ b/docs/learn-flink/datastream_api.md @@ -3,7 +3,7 @@ title: Intro to the DataStream API nav-id: datastream-api nav-pos: 2 nav-title: Intro to the DataStream API -nav-parent_id: training +nav-parent_id: learn-flink ---
[flink] 04/07: [FLINK-17980][docs] Update broken links
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit aec7e660f294b10fd90bc9318450a53f40c3e96b Author: Seth Wiesman AuthorDate: Mon Jun 8 14:03:11 2020 -0500 [FLINK-17980][docs] Update broken links --- docs/concepts/index.md | 13 ++--- docs/concepts/index.zh.md| 9 - docs/dev/connectors/elasticsearch.md | 2 +- docs/dev/connectors/elasticsearch.zh.md | 2 +- docs/dev/stream/state/checkpointing.md | 4 ++-- docs/dev/stream/state/checkpointing.zh.md| 4 ++-- docs/index.md| 6 +++--- docs/index.zh.md | 6 +++--- docs/internals/task_lifecycle.md | 2 +- docs/internals/task_lifecycle.zh.md | 2 +- docs/learn-flink/index.md| 2 +- docs/learn-flink/index.zh.md | 2 +- docs/ops/state/savepoints.md | 2 +- docs/ops/state/savepoints.zh.md | 2 +- docs/try-flink/flink-operations-playground.md| 4 ++-- docs/try-flink/flink-operations-playground.zh.md | 4 ++-- 16 files changed, 32 insertions(+), 34 deletions(-) diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 2c44e1f..11beb8e 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -1,8 +1,8 @@ --- -title: Concepts in Depth +title: Concepts nav-id: concepts nav-pos: 3 -nav-title: ' Concepts in Depth' +nav-title: ' Concepts' nav-parent_id: root nav-show_overview: true permalink: /concepts/index.html @@ -26,13 +26,12 @@ specific language governing permissions and limitations under the License. --> -The [Hands-on Training]({% link training/index.md %}) explains the basic concepts +The [Hands-on Training]({% link learn-flink/index.md %}) explains the basic concepts of stateful and timely stream processing that underlie Flink's APIs, and provides examples of how these mechanisms are used in applications. Stateful stream processing is introduced in the context -of [Data Pipelines & ETL]({% link training/etl.md %}#stateful-transformations) -and is further developed in the section on [Fault Tolerance]({% link -training/fault_tolerance.md %}). Timely stream processing is introduced in the section on -[Streaming Analytics]({% link training/streaming_analytics.md %}). +of [Data Pipelines & ETL]({% link learn-flink/etl.md %}#stateful-transformations) +and is further developed in the section on [Fault Tolerance]({% link learn-flink/fault_tolerance.md %}). Timely stream processing is introduced in the section on +[Streaming Analytics]({% link learn-flink/streaming_analytics.md %}). This _Concepts in Depth_ section provides a deeper understanding of how Flink's architecture and runtime implement these concepts. diff --git a/docs/concepts/index.zh.md b/docs/concepts/index.zh.md index fab83f7..54f7dfb 100644 --- a/docs/concepts/index.zh.md +++ b/docs/concepts/index.zh.md @@ -26,13 +26,12 @@ specific language governing permissions and limitations under the License. --> -The [Hands-on Training]({% link training/index.zh.md %}) explains the basic concepts +The [Hands-on Training]({% link learn-flink/index.zh.md %}) explains the basic concepts of stateful and timely stream processing that underlie Flink's APIs, and provides examples of how these mechanisms are used in applications. Stateful stream processing is introduced in the context -of [Data Pipelines & ETL]({% link training/etl.zh.md %}#stateful-transformations) -and is further developed in the section on [Fault Tolerance]({% link -training/fault_tolerance.zh.md %}). Timely stream processing is introduced in the section on -[Streaming Analytics]({% link training/streaming_analytics.zh.md %}). +of [Data Pipelines & ETL]({% link learn-flink/etl.zh.md %}#stateful-transformations) +and is further developed in the section on [Fault Tolerance]({% link learn-flink/fault_tolerance.zh.md %}). Timely stream processing is introduced in the section on +[Streaming Analytics]({% link learn-flink/streaming_analytics.zh.md %}). This _Concepts in Depth_ section provides a deeper understanding of how Flink's architecture and runtime implement these concepts. diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 4b8b2da..9fd808e 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -317,7 +317,7 @@ time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink. -More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/training/fault_tolerance.html). +More details on
[flink] branch master updated (7aa5f33 -> 55712a5)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7aa5f33 [FLINK-18084][docs] Document the Application Mode new 5473710 [FLINK-17980][docs] Move getting started walkthroughs to Try Flink new 5df3837 [FLINK-17980][docs] Rename Hands-on Training to Learn Flink new 49b2e4f [FLINK-17980][docs] Update headings of datastream and table walkthroughs new aec7e66 [FLINK-17980][docs] Update broken links new f2db6ac [FLINK-17980][docs] Add training redirects new 420c77b [FLINK-17980][docs] Update broken links new 55712a5 [FLINK-17980][docs] Add training redirects The 7 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/concepts/index.md | 13 ++-- docs/concepts/index.zh.md | 9 +-- docs/dev/connectors/elasticsearch.md | 2 +- docs/dev/connectors/elasticsearch.zh.md| 2 +- docs/dev/stream/state/checkpointing.md | 4 +- docs/dev/stream/state/checkpointing.zh.md | 4 +- docs/getting-started/index.md | 86 -- docs/getting-started/index.zh.md | 52 - docs/index.md | 6 +- docs/index.zh.md | 6 +- docs/internals/task_lifecycle.md | 2 +- docs/internals/task_lifecycle.zh.md| 2 +- docs/{training => learn-flink}/datastream_api.md | 2 +- .../{training => learn-flink}/datastream_api.zh.md | 2 +- docs/{training => learn-flink}/etl.md | 2 +- docs/{training => learn-flink}/etl.zh.md | 2 +- docs/{training => learn-flink}/event_driven.md | 8 +- docs/{training => learn-flink}/event_driven.zh.md | 8 +- docs/{training => learn-flink}/fault_tolerance.md | 6 +- .../fault_tolerance.zh.md | 6 +- docs/{training => learn-flink}/index.md| 6 +- docs/{training => learn-flink}/index.zh.md | 2 +- .../streaming_analytics.md | 8 +- .../streaming_analytics.zh.md | 8 +- docs/ops/state/savepoints.md | 2 +- docs/ops/state/savepoints.zh.md| 2 +- .../datastream-walkthrough.md} | 9 +-- .../flink-operations-playground.md}| 9 +-- .../index.md => redirects/getting-started.md} | 11 ++- .../python_table_walkthrough.md} | 9 +-- .../index.md => redirects/table-walkthrough.md}| 9 +-- .../index.zh.md => redirects/training.md} | 11 ++- .../walkthroughs => try-flink}/datastream_api.md | 7 +- .../datastream_api.zh.md | 7 +- .../flink-operations-playground.md | 8 +- .../flink-operations-playground.zh.md | 8 +- .../docker-playgrounds => try-flink}/index.md | 12 +-- .../docker-playgrounds => try-flink}/index.zh.md | 11 +-- .../walkthroughs => try-flink}/python_table_api.md | 4 +- .../python_table_api.zh.md | 4 +- .../walkthroughs => try-flink}/table_api.md| 7 +- .../walkthroughs => try-flink}/table_api.zh.md | 7 +- 42 files changed, 119 insertions(+), 266 deletions(-) delete mode 100644 docs/getting-started/index.md delete mode 100644 docs/getting-started/index.zh.md rename docs/{training => learn-flink}/datastream_api.md (99%) rename docs/{training => learn-flink}/datastream_api.zh.md (99%) rename docs/{training => learn-flink}/etl.md (99%) rename docs/{training => learn-flink}/etl.zh.md (99%) rename docs/{training => learn-flink}/event_driven.md (97%) rename docs/{training => learn-flink}/event_driven.zh.md (98%) rename docs/{training => learn-flink}/fault_tolerance.md (97%) rename docs/{training => learn-flink}/fault_tolerance.zh.md (97%) rename docs/{training => learn-flink}/index.md (99%) rename docs/{training => learn-flink}/index.zh.md (99%) rename docs/{training => learn-flink}/streaming_analytics.md (98%) rename docs/{training => learn-flink}/streaming_analytics.zh.md (98%) copy docs/{getting-started/docker-playgrounds/index.zh.md => redirects/datastream-walkthrough.md} (83%) copy docs/{getting-started/docker-playgrounds/index.md => redirects/flink-operations-playground.md} (81%) rename docs/{getting-started/walkthroughs/index.md => redirects/getting-started.md} (86%) copy docs/{getting-started/docker-playgrounds/index.md => redirects/python_table_walkthrough.md} (83%) copy docs/{getting-started/docker-playgrounds/index.md => redirects/table-walkthrough.md} (85%) rename
[flink] 07/07: [FLINK-17980][docs] Add training redirects
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 55712a563126bad74c42b42b233bc99a263d8239 Author: Seth Wiesman AuthorDate: Mon Jun 8 15:13:17 2020 -0500 [FLINK-17980][docs] Add training redirects This closes #12534 --- docs/redirects/training.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/redirects/training.md b/docs/redirects/training.md index 7061c6f..09cc129 100644 --- a/docs/redirects/training.md +++ b/docs/redirects/training.md @@ -1,7 +1,7 @@ --- title: Hands-on Training layout: redirect -redirect: /learn-flink/html +redirect: /learn-flink/index.html permalink: /training/index.html ---
[flink] 01/07: [FLINK-17980][docs] Move getting started walkthroughs to Try Flink
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 54737109e1b5caa0b7fc4e7f83b304b5b249d487 Author: Seth Wiesman AuthorDate: Mon Jun 8 11:33:16 2020 -0500 [FLINK-17980][docs] Move getting started walkthroughs to Try Flink --- docs/getting-started/index.md | 86 -- docs/getting-started/index.zh.md | 52 - .../datastream-walkthrough.md} | 9 +-- .../flink-operations-playground.md}| 9 +-- .../index.md => redirects/getting-started.md} | 11 ++- .../python_table_walkthrough.md} | 11 ++- .../index.md => redirects/table-walkthrough.md}| 9 +-- .../walkthroughs => try-flink}/datastream_api.md | 2 +- .../datastream_api.zh.md | 2 +- .../flink-operations-playground.md | 4 +- .../flink-operations-playground.zh.md | 4 +- .../docker-playgrounds => try-flink}/index.md | 12 +-- .../docker-playgrounds => try-flink}/index.zh.md | 11 +-- .../walkthroughs => try-flink}/python_table_api.md | 4 +- .../python_table_api.zh.md | 4 +- .../walkthroughs => try-flink}/table_api.md| 2 +- .../walkthroughs => try-flink}/table_api.zh.md | 2 +- 17 files changed, 47 insertions(+), 187 deletions(-) diff --git a/docs/getting-started/index.md b/docs/getting-started/index.md deleted file mode 100644 index a6b5cab..000 --- a/docs/getting-started/index.md +++ /dev/null @@ -1,86 +0,0 @@ -title: "Getting Started" -nav-id: getting-started -nav-title: ' Getting Started' -nav-parent_id: root -section-break: true -nav-show_overview: true -nav-pos: 1 - - -There are many ways to get started with Apache Flink. Which one is the best for -you depends on your goals and prior experience: - -* take a look at the **Docker Playgrounds** if you want to see what Flink can do, via a hands-on, - docker-based introduction to specific Flink concepts -* explore one of the **Code Walkthroughs** if you want a quick, end-to-end - introduction to one of Flink's APIs -* work your way through the **Hands-on Training** for a comprehensive, - step-by-step introduction to Flink -* use **Project Setup** if you already know the basics of Flink and want a - project template for Java or Scala, or need help setting up the dependencies - -### Taking a first look at Flink - -The **Docker Playgrounds** provide sandboxed Flink environments that are set up in just a few minutes and which allow you to explore and play with Flink. - -* The [**Operations Playground**]({% link getting-started/docker-playgrounds/flink-operations-playground.md %}) shows you how to operate streaming applications with Flink. You can experience how Flink recovers application from failures, upgrade and scale streaming applications up and down, and query application metrics. - - - -### First steps with one of Flink's APIs - -The **Code Walkthroughs** are a great way to get started quickly with a step-by-step introduction to -one of Flink's APIs. Each walkthrough provides instructions for bootstrapping a small skeleton -project, and then shows how to extend it to a simple application. - -* The [**DataStream API** code walkthrough]({% link getting-started/walkthroughs/datastream_api.md %}) shows how - to implement a simple DataStream application and how to extend it to be stateful and use timers. - The DataStream API is Flink's main abstraction for implementing stateful streaming applications - with sophisticated time semantics in Java or Scala. - -* Flink's **Table API** is a relational API used for writing SQL-like queries in Java, Scala, or - Python, which are then automatically optimized, and can be executed on batch or streaming data - with identical syntax and semantics. The [Table API code walkthrough for Java and Scala]({% link - getting-started/walkthroughs/table_api.md %}) shows how to implement a simple Table API query on a - batch source and how to evolve it into a continuous query on a streaming source. There's also a - similar [code walkthrough for the Python Table API]({% link - getting-started/walkthroughs/python_table_api.md %}). - -### Taking a Deep Dive with the Hands-on Training - -The [**Hands-on Training**]({% link training/index.md %}) is a self-paced training course with -a set of lessons and hands-on exercises. This step-by-step introduction to Flink focuses -on learning how to use the DataStream API to meet the needs of common, real-world use cases, -and provides a complete introduction to the fundamental concepts: parallel dataflows, -stateful stream processing, event time and watermarking, and fault tolerance via state snapshots. - - diff --git a/docs/getting-started/index.zh.md b/docs/getting-started/index.zh.md deleted file mode 100644 index
[flink] branch release-1.11 updated (e5b1ca0 -> 7dfd517)
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from e5b1ca0 [FLINK-17795][example] Add MatrixVectorMul example new 08973b5 [hotfix][cli] Update the help message of the Generic CLI new 924265a [hotfix] Fix typo in ops/deployment/index new b4adb22 [FLINK-18084] Rename the ExecutorCLI to GenericCLI according to docs new 7dfd517 [FLINK-18084][docs] Document the Application Mode The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/ops/cli.md| 132 ++--- docs/ops/cli.zh.md | 132 ++--- docs/ops/deployment/index.md | 80 - docs/ops/deployment/index.zh.md| 78 docs/ops/deployment/native_kubernetes.md | 2 +- docs/ops/deployment/native_kubernetes.zh.md| 2 +- docs/ops/deployment/yarn_setup.md | 43 ++- docs/ops/deployment/yarn_setup.zh.md | 39 +- .../org/apache/flink/client/cli/CliFrontend.java | 2 +- .../cli/{ExecutorCLI.java => GenericCLI.java} | 14 ++- .../{ExecutorCLITest.java => GenericCLITest.java} | 12 +- .../flink/kubernetes/cli/KubernetesSessionCli.java | 6 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 4 +- 13 files changed, 392 insertions(+), 154 deletions(-) rename flink-clients/src/main/java/org/apache/flink/client/cli/{ExecutorCLI.java => GenericCLI.java} (87%) rename flink-clients/src/test/java/org/apache/flink/client/cli/{ExecutorCLITest.java => GenericCLITest.java} (94%)
[flink] 01/04: [hotfix][cli] Update the help message of the Generic CLI
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 08973b5e4b4f6607c8d0d39a0f4f690c3f95862d Author: Kostas Kloudas AuthorDate: Mon Jun 8 14:36:53 2020 +0200 [hotfix][cli] Update the help message of the Generic CLI (cherry picked from commit c67e7962a4a3990e1daf37f3863a2d04e81233dc) --- .../src/main/java/org/apache/flink/client/cli/ExecutorCLI.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java index e88de9e..152ca33 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java @@ -51,12 +51,16 @@ public class ExecutorCLI implements CustomCommandLine { private static final String ID = "Generic CLI"; private final Option executorOption = new Option("e", "executor", true, - "The name of the executor to be used for executing the given job, which is equivalent " + + "DEPRECATED: Please use the -t option instead which is also available with the \"Application Mode\".\n" + + "The name of the executor to be used for executing the given job, which is equivalent " + "to the \"" + DeploymentOptions.TARGET.key() + "\" config option. The " + "currently available executors are: " + getExecutorFactoryNames() + "."); private final Option targetOption = new Option("t", "target", true, - "The type of the deployment target: e.g. yarn-application."); + "The name of the executor to be used for executing the given job, which is equivalent " + + "to the \"" + DeploymentOptions.TARGET.key() + "\" config option. The " + + "currently available executors are: " + getExecutorFactoryNames() + + ", \"yarn-application\" and \"kubernetes-application\"."); /** * Dynamic properties allow the user to specify additional configuration values with -D, such as
[flink] 03/04: [FLINK-18084] Rename the ExecutorCLI to GenericCLI according to docs
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit b4adb22b4d94964145305e8bb2daee8f40a0633a Author: Kostas Kloudas AuthorDate: Mon Jun 8 15:58:50 2020 +0200 [FLINK-18084] Rename the ExecutorCLI to GenericCLI according to docs (cherry picked from commit bdcaea60c61a87bc110316555d4dc7fe38b06047) --- docs/ops/cli.md| 132 ++--- docs/ops/cli.zh.md | 132 ++--- .../org/apache/flink/client/cli/CliFrontend.java | 2 +- .../cli/{ExecutorCLI.java => GenericCLI.java} | 10 +- .../{ExecutorCLITest.java => GenericCLITest.java} | 12 +- .../flink/kubernetes/cli/KubernetesSessionCli.java | 6 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 4 +- 7 files changed, 149 insertions(+), 149 deletions(-) diff --git a/docs/ops/cli.md b/docs/ops/cli.md index eed5e7d..571cd83 100644 --- a/docs/ops/cli.md +++ b/docs/ops/cli.md @@ -49,17 +49,17 @@ Flink has the concept of executors for defining available deployment targets. Yo available executors in the output of `bin/flink --help`, for example: ``` -Options for executor mode: - -DGeneric configuration options for - execution/deployment and for the configured executor. - The available options can be found at - https://ci.apache.org/projects/flink/flink-docs-stabl - e/ops/config.html - -e,--executorThe name of the executor to be used for executing the - given job, which is equivalent to the - "execution.target" config option. The currently - available executors are: "remote", "local", - "kubernetes-session", "yarn-per-job", "yarn-session". +Options for Generic CLI mode: + -DGeneric configuration options for + execution/deployment and for the configured executor. + The available options can be found at + https://ci.apache.org/projects/flink/flink-docs-stabl + e/ops/config.html + -t,--target The deployment target for the given application, + which is equivalent to the "execution.target" config + option. The currently available targets are: + "remote", "local", "kubernetes-session", "yarn-per-job", + "yarn-session", "yarn-application" and "kubernetes-application". ``` When running one of the `bin/flink` actions, the executor is specified using the `--executor` @@ -453,17 +453,17 @@ Action "run" compiles and runs a program. -z,--zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode - Options for executor mode: - -DGeneric configuration options for - execution/deployment and for the configured executor. - The available options can be found at - https://ci.apache.org/projects/flink/flink-docs-stabl - e/ops/config.html - -e,--executorThe name of the executor to be used for executing the - given job, which is equivalent to the - "execution.target" config option. The currently - available executors are: "remote", "local", - "kubernetes-session", "yarn-per-job", "yarn-session". + Options for Generic CLI mode: + -DGeneric configuration options for + execution/deployment and for the configured executor. + The available options can be found at + https://ci.apache.org/projects/flink/flink-docs-stabl + e/ops/config.html + -t,--target The deployment target for the given application, + which is equivalent to the "execution.target" config + option. The currently available targets are: + "remote", "local", "kubernetes-session", "yarn-per-job", + "yarn-session", "yarn-application" and "kubernetes-application". Options for default mode: -m,--jobmanagerAddress of the JobManager (master) to which @@ -505,17 +505,17 @@ Action "list" lists running and scheduled programs. -z,--zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode - Options for executor mode: - -DGeneric configuration options for -
[flink] 04/04: [FLINK-18084][docs] Document the Application Mode
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7dfd5178ecd6e3a22cb4a4052635fdc94009af4c Author: Kostas Kloudas AuthorDate: Mon Jun 8 15:54:52 2020 +0200 [FLINK-18084][docs] Document the Application Mode This closes #12549. (cherry picked from commit 7aa5f3310ed781477e84de7e3a20da089ee3b4b5) --- docs/ops/deployment/index.md| 78 + docs/ops/deployment/index.zh.md | 78 + docs/ops/deployment/native_kubernetes.md| 2 +- docs/ops/deployment/native_kubernetes.zh.md | 2 +- docs/ops/deployment/yarn_setup.md | 43 +++- docs/ops/deployment/yarn_setup.zh.md| 39 ++- 6 files changed, 238 insertions(+), 4 deletions(-) diff --git a/docs/ops/deployment/index.md b/docs/ops/deployment/index.md index 94ef1ce..9824541 100644 --- a/docs/ops/deployment/index.md +++ b/docs/ops/deployment/index.md @@ -29,6 +29,84 @@ When deciding how and where to run Flink, there's a wide range of options availa * This will be replaced by the TOC {:toc} +## Deployment Modes + +Flink can execute applications in one of three ways: + - in Session Mode, + - in a Per-Job Mode, or + - in Application Mode. + + The above modes differ in: + - the cluster lifecycle and resource isolation guarantees + - whether the application's `main()` method is executed on the client or on the cluster. + + Session Mode + +*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any +submitted application. Applications executed in the same (session) cluster use, and consequently compete +for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up +a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a Task Manager, +then all jobs running on that Task Manager will be affected by the failure. This, apart from a negative +impact on the job that caused the failure, implies a potential massive recovery process with all the +restarting jobs accessing the filesystem concurrently and making it unavailable to other services. +Additionally, having a single cluster running multiple jobs implies more load for the Flink Master, who +is responsible for the book-keeping of all the jobs in the cluster. + + Per-Job Mode + +Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available cluster manager +framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to +that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are +cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own +Task Managers. In addition, it spreads the load of book-keeping across multiple Flink Masters, as there is +one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many +production reasons. + + Application Mode + +In all the above modes, the application's `main()` method is executed on the client side. This process +includes downloading the application's dependencies locally, executing the `main()` to extract a representation +of the application that Flink's runtime can understand (i.e. the `JobGraph`) and ship the dependencies and +the `JobGraph(s)` to the cluster. This makes the Client a heavy resource consumer as it may need substantial +network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the +`main()`. This problem can be more pronounced when the Client is shared across users. + +Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time, +the `main()` method of the application is executed on the Flink Master. Creating a cluster per application can be +seen as creating a session cluster shared only among the jobs of a particular application, and torn down when +the application finishes. With this architecture, the *Application Mode* provides the same resource isolation +and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing +the `main()` on the Flink Master allows for saving the CPU cycles required, but also save the bandwidth required +for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load of +downloading the dependencies of the applications in the cluster, as there is one Flink Master per application. + + + Note: In the Application Mode, the `main()` is executed on the cluster and not on the client, + as in the other modes. This may have implications for your code as, for example, any paths you register in +
[flink] 02/04: [hotfix] Fix typo in ops/deployment/index
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 924265a33a5109d91aa749a67fcf7e2ba1cc4f9c Author: Kostas Kloudas AuthorDate: Mon Jun 8 14:46:02 2020 +0200 [hotfix] Fix typo in ops/deployment/index (cherry picked from commit d98b935b200774f14c130b6ad6dbeff43f692bdf) --- docs/ops/deployment/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ops/deployment/index.md b/docs/ops/deployment/index.md index 85fde97..94ef1ce 100644 --- a/docs/ops/deployment/index.md +++ b/docs/ops/deployment/index.md @@ -62,7 +62,7 @@ Apache Flink ships with first class support for a number of common deployment ta Yarn -Deploy Flink on-top Apache Hadoop's resource manager +Deploy Flink on-top of Apache Hadoop's resource manager Learn more
[flink] branch master updated (0c9e7b2 -> 7aa5f33)
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0c9e7b2 [FLINK-17795][example] Add MatrixVectorMul example add c67e796 [hotfix][cli] Update the help message of the Generic CLI add d98b935 [hotfix] Fix typo in ops/deployment/index add bdcaea6 [FLINK-18084] Rename the ExecutorCLI to GenericCLI according to docs add 7aa5f33 [FLINK-18084][docs] Document the Application Mode No new revisions were added by this update. Summary of changes: docs/ops/cli.md| 132 ++--- docs/ops/cli.zh.md | 132 ++--- docs/ops/deployment/index.md | 80 - docs/ops/deployment/index.zh.md| 78 docs/ops/deployment/native_kubernetes.md | 2 +- docs/ops/deployment/native_kubernetes.zh.md| 2 +- docs/ops/deployment/yarn_setup.md | 43 ++- docs/ops/deployment/yarn_setup.zh.md | 39 +- .../org/apache/flink/client/cli/CliFrontend.java | 2 +- .../cli/{ExecutorCLI.java => GenericCLI.java} | 14 ++- .../{ExecutorCLITest.java => GenericCLITest.java} | 12 +- .../flink/kubernetes/cli/KubernetesSessionCli.java | 6 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 4 +- 13 files changed, 392 insertions(+), 154 deletions(-) rename flink-clients/src/main/java/org/apache/flink/client/cli/{ExecutorCLI.java => GenericCLI.java} (87%) rename flink-clients/src/test/java/org/apache/flink/client/cli/{ExecutorCLITest.java => GenericCLITest.java} (94%)
[flink] branch master updated (0c9e7b2 -> 7aa5f33)
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 0c9e7b2 [FLINK-17795][example] Add MatrixVectorMul example add c67e796 [hotfix][cli] Update the help message of the Generic CLI add d98b935 [hotfix] Fix typo in ops/deployment/index add bdcaea6 [FLINK-18084] Rename the ExecutorCLI to GenericCLI according to docs add 7aa5f33 [FLINK-18084][docs] Document the Application Mode No new revisions were added by this update. Summary of changes: docs/ops/cli.md| 132 ++--- docs/ops/cli.zh.md | 132 ++--- docs/ops/deployment/index.md | 80 - docs/ops/deployment/index.zh.md| 78 docs/ops/deployment/native_kubernetes.md | 2 +- docs/ops/deployment/native_kubernetes.zh.md| 2 +- docs/ops/deployment/yarn_setup.md | 43 ++- docs/ops/deployment/yarn_setup.zh.md | 39 +- .../org/apache/flink/client/cli/CliFrontend.java | 2 +- .../cli/{ExecutorCLI.java => GenericCLI.java} | 14 ++- .../{ExecutorCLITest.java => GenericCLITest.java} | 12 +- .../flink/kubernetes/cli/KubernetesSessionCli.java | 6 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 4 +- 13 files changed, 392 insertions(+), 154 deletions(-) rename flink-clients/src/main/java/org/apache/flink/client/cli/{ExecutorCLI.java => GenericCLI.java} (87%) rename flink-clients/src/test/java/org/apache/flink/client/cli/{ExecutorCLITest.java => GenericCLITest.java} (94%)
[flink] branch release-1.11 updated (76d1921 -> e5b1ca0)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 76d1921 [hotfix] Correct the default path of nvidia-gpu-discovery add e5b1ca0 [FLINK-17795][example] Add MatrixVectorMul example No new revisions were added by this update. Summary of changes: flink-dist/src/main/assemblies/bin.xml | 1 + flink-examples/flink-examples-streaming/pom.xml| 68 ++ .../streaming/examples/gpu/MatrixVectorMul.java| 244 + 3 files changed, 313 insertions(+) create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
[flink] branch master updated (70bfb61 -> 0c9e7b2)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 70bfb61 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests add 0c9e7b2 [FLINK-17795][example] Add MatrixVectorMul example No new revisions were added by this update. Summary of changes: flink-dist/src/main/assemblies/bin.xml | 1 + flink-examples/flink-examples-streaming/pom.xml| 68 ++ .../streaming/examples/gpu/MatrixVectorMul.java| 244 + 3 files changed, 313 insertions(+) create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
[flink] branch master updated: [FLINK-17795][example] Add MatrixVectorMul example
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0c9e7b2 [FLINK-17795][example] Add MatrixVectorMul example 0c9e7b2 is described below commit 0c9e7b21897cc3b4258ff21b295c9e0e8d7cd13f Author: Yangze Guo AuthorDate: Mon May 25 18:31:41 2020 +0800 [FLINK-17795][example] Add MatrixVectorMul example This closes #12398. --- flink-dist/src/main/assemblies/bin.xml | 1 + flink-examples/flink-examples-streaming/pom.xml| 68 ++ .../streaming/examples/gpu/MatrixVectorMul.java| 244 + 3 files changed, 313 insertions(+) diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index bc8bb4f..c68f9d6 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -235,6 +235,7 @@ under the License. flink-examples-streaming*.jar original-*.jar + MatrixVectorMul.jar diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index a055d5e..0bd841e 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -34,6 +34,11 @@ under the License. jar + + + 10.0.0 + + @@ -88,6 +93,33 @@ under the License. ${project.version} + + + org.jcuda + jcuda + ${jcuda.version} + + + org.jcuda + jcuda-natives + + + + + + org.jcuda + jcublas + ${jcuda.version} + + + org.jcuda + jcublas-natives + + + + @@ -365,6 +397,42 @@ under the License. + + org.apache.maven.plugins + maven-shade-plugin + + + MatrixVectorMul + package + + shade + + + false + MatrixVectorMul + + + org.jcuda:* + + + + + org.apache.flink:* + + org/apache/flink/streaming/examples/gpu/MatrixVectorMul.class + org/apache/flink/streaming/examples/gpu/MatrixVectorMul$*.class + + + + + + org.apache.flink.streaming.examples.gpu.MatrixVectorMul + + + + + + diff --git
[flink] branch master updated: [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 70bfb61 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests 70bfb61 is described below commit 70bfb61a48b1d1f5afce0ebc0f58a786e8013e29 Author: Wei Zhong AuthorDate: Tue Jun 9 21:02:46 2020 +0800 [FLINK-18218][python][e2e] Add PyFlink YARN per-job e2e tests This closes #12554. --- .../flink-python-test/python/python_job.py | 13 +++-- .../org/apache/flink/python/tests/util/AddOne.java | 29 +++ .../test-scripts/test_pyflink.sh | 58 +- flink-python/dev/lint-python.sh| 4 +- 4 files changed, 97 insertions(+), 7 deletions(-) diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py index 8df2d12..a85e633 100644 --- a/flink-end-to-end-tests/flink-python-test/python/python_job.py +++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py @@ -38,6 +38,11 @@ def word_count(): env = ExecutionEnvironment.get_execution_environment() t_env = BatchTableEnvironment.create(env, t_config) +# used to test pipeline.jars and pipleline.classpaths +config_key = sys.argv[1] +config_value = sys.argv[2] +t_env.get_config().get_configuration().set_string(config_key, config_value) + # register Results table in table environment tmp_dir = tempfile.gettempdir() result_path = tmp_dir + '/result' @@ -55,7 +60,8 @@ def word_count(): sink_ddl = """ create table Results( word VARCHAR, -`count` BIGINT +`count` BIGINT, +`count_java` BIGINT ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', @@ -65,12 +71,13 @@ def word_count(): t_env.sql_update(sink_ddl) t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python") +t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne") elements = [(word, 0) for word in content.split(" ")] t_env.from_elements(elements, ["word", "count"]) \ -.select("word, add_one(count) as count") \ +.select("word, add_one(count) as count, add_one_java(count) as count_java") \ .group_by("word") \ -.select("word, count(count) as count") \ +.select("word, count(count) as count, count(count_java) as count_java") \ .insert_into("Results") t_env.execute("word_count") diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java new file mode 100644 index 000..f0b0cc0 --- /dev/null +++ b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/util/AddOne.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python.tests.util; + +import org.apache.flink.table.functions.ScalarFunction; + +/** + * Scala UDF for testing. + */ +public class AddOne extends ScalarFunction { + public long eval(long input) { + return input + 1; + } +} diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index ac521f1..1e58921 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -20,6 +20,7 @@ set -Eeuo pipefail CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P` source "${CURRENT_DIR}"/common.sh +source "${CURRENT_DIR}"/common_yarn_docker.sh cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf" @@ -65,13 +66,23 @@ REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt" echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}" -echo "Test submitting python job:\n" +echo "Test submitting python job with 'pipeline.jars':\n" PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
[flink] branch release-1.11 updated: [hotfix] Correct the default path of nvidia-gpu-discovery
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 76d1921 [hotfix] Correct the default path of nvidia-gpu-discovery 76d1921 is described below commit 76d19219ae810d1082cf6468d57f270bcc636d5e Author: Yangze Guo AuthorDate: Wed Jun 10 19:00:09 2020 +0800 [hotfix] Correct the default path of nvidia-gpu-discovery --- .../src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java index 9acd8eb..f325b13 100644 --- a/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java +++ b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java @@ -61,7 +61,7 @@ class GPUDriver implements ExternalResourceDriver { static final ConfigOption DISCOVERY_SCRIPT_PATH = key("discovery-script.path") .stringType() - .defaultValue(String.format("%s/external-resource-gpu/nvidia-gpu-discovery.sh", ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS)); + .defaultValue(String.format("%s/external_resource_gpu/nvidia-gpu-discovery.sh", ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS)); @VisibleForTesting static final ConfigOption DISCOVERY_SCRIPT_ARG =
[flink] branch master updated (218e509 -> 828dccd)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 218e509 [minor] Propagate recent changes to chinese documentation add 828dccd [hotfix] Correct the default path of nvidia-gpu-discovery No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch release-1.11 updated: [minor] Propagate recent changes to chinese documentation
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 3165418 [minor] Propagate recent changes to chinese documentation 3165418 is described below commit 3165418fb0ab2f7ffd17d380a7c462fb10001a58 Author: Aljoscha Krettek AuthorDate: Wed Jun 10 12:42:31 2020 +0200 [minor] Propagate recent changes to chinese documentation --- docs/concepts/stateful-stream-processing.zh.md | 26 +- docs/concepts/timely-stream-processing.zh.md | 14 +- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/docs/concepts/stateful-stream-processing.zh.md b/docs/concepts/stateful-stream-processing.zh.md index e7d19ad..d991e83 100644 --- a/docs/concepts/stateful-stream-processing.zh.md +++ b/docs/concepts/stateful-stream-processing.zh.md @@ -24,6 +24,11 @@ specific language governing permissions and limitations under the License. --> +* This will be replaced by the TOC +{:toc} + +## What is State? + While many operations in a dataflow simply look at one individual *event at a time* (for example an event parser), some operations remember information across multiple events (for example window operators). These operations are @@ -54,19 +59,6 @@ When working with state, it might also be useful to read about [Flink's state backends]({% link ops/state/state_backends.zh.md %}). Flink provides different state backends that specify how and where state is stored. -* This will be replaced by the TOC -{:toc} - -## What is State? - -`TODO: expand this section` - -{% top %} - -## State in Stream & Batch Processing - -`TODO: What is this section about? Do we even need it?` - {% top %} ## Keyed State @@ -252,8 +244,6 @@ See [Restart Strategies]({% link dev/task_failure_recovery.zh.md ### State Backends -`TODO: expand this section` - The exact data structures in which the key/values indexes are stored depends on the chosen [state backend]({% link ops/state/state_backends.zh.md %}). One state backend stores data in an in-memory @@ -270,8 +260,6 @@ logic. ### Savepoints -`TODO: expand this section` - All programs that use checkpointing can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. @@ -312,10 +300,6 @@ give *exactly once* guarantees even in *at least once* mode. {% top %} -## End-to-end Exactly-Once Programs - -`TODO: add` - ## State and Fault Tolerance in Batch Programs Flink executes [batch programs](../dev/batch/index.html) as a special case of diff --git a/docs/concepts/timely-stream-processing.zh.md b/docs/concepts/timely-stream-processing.zh.md index 9f80f72..38bab79 100644 --- a/docs/concepts/timely-stream-processing.zh.md +++ b/docs/concepts/timely-stream-processing.zh.md @@ -24,16 +24,20 @@ specific language governing permissions and limitations under the License. --> -`TODO: add introduction` - * This will be replaced by the TOC {:toc} -## Latency & Completeness +## Introduction -`TODO: add these two sections` +Timely steam processing is an extension of [stateful stream processing]({% link +concepts/stateful-stream-processing.zh.md %}) in which time plays some role in the +computation. Among other things, this is the case when you do time series +analysis, when doing aggregations based on certain time periods (typically +called windows), or when you do event processing where the time when an event +occured is important. -### Latency vs. Completeness in Batch & Stream Processing +In the following sections we will highlight some of the topics that you should +consider when working with timely Flink Applications. {% top %}
[flink] branch master updated (1a2fdbf -> 218e509)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1a2fdbf [FLINK-17982] Remove TODOs from timely-stream-processing.md add 218e509 [minor] Propagate recent changes to chinese documentation No new revisions were added by this update. Summary of changes: docs/concepts/stateful-stream-processing.zh.md | 26 +- docs/concepts/timely-stream-processing.zh.md | 14 +- 2 files changed, 14 insertions(+), 26 deletions(-)
[flink] 01/04: [FLINK-16213] Move stateful-stream-processing.md introduction to form "What is State" section
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 3cc3310ae683be0b829fb67ba84d89398cdb9cf1 Author: Aljoscha Krettek AuthorDate: Tue Jun 9 12:52:41 2020 +0200 [FLINK-16213] Move stateful-stream-processing.md introduction to form "What is State" section --- docs/concepts/stateful-stream-processing.md | 12 +--- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/concepts/stateful-stream-processing.md b/docs/concepts/stateful-stream-processing.md index b8cdd63..7648791 100644 --- a/docs/concepts/stateful-stream-processing.md +++ b/docs/concepts/stateful-stream-processing.md @@ -24,6 +24,11 @@ specific language governing permissions and limitations under the License. --> +* This will be replaced by the TOC +{:toc} + +## What is State? + While many operations in a dataflow simply look at one individual *event at a time* (for example an event parser), some operations remember information across multiple events (for example window operators). These operations are @@ -54,13 +59,6 @@ When working with state, it might also be useful to read about [Flink's state backends]({% link ops/state/state_backends.md %}). Flink provides different state backends that specify how and where state is stored. -* This will be replaced by the TOC -{:toc} - -## What is State? - -`TODO: expand this section` - {% top %} ## State in Stream & Batch Processing
[flink] 02/04: [FLINK-17982] Remove TODOs from stateful-stream-processing
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 0e080acea4048beb6db72f167b1ed919fe551ace Author: Aljoscha Krettek AuthorDate: Tue Jun 9 12:53:55 2020 +0200 [FLINK-17982] Remove TODOs from stateful-stream-processing For some sections, we will expand them in the future when we have actual functionality. Some aimed at expanding sections but they are also ok as they are now. --- docs/concepts/stateful-stream-processing.md | 14 -- 1 file changed, 14 deletions(-) diff --git a/docs/concepts/stateful-stream-processing.md b/docs/concepts/stateful-stream-processing.md index 7648791..112171a 100644 --- a/docs/concepts/stateful-stream-processing.md +++ b/docs/concepts/stateful-stream-processing.md @@ -61,12 +61,6 @@ provides different state backends that specify how and where state is stored. {% top %} -## State in Stream & Batch Processing - -`TODO: What is this section about? Do we even need it?` - -{% top %} - ## Keyed State Keyed state is maintained in what can be thought of as an embedded key/value @@ -250,8 +244,6 @@ See [Restart Strategies]({% link dev/task_failure_recovery.md ### State Backends -`TODO: expand this section` - The exact data structures in which the key/values indexes are stored depends on the chosen [state backend]({% link ops/state/state_backends.md %}). One state backend stores data in an in-memory @@ -268,8 +260,6 @@ logic. ### Savepoints -`TODO: expand this section` - All programs that use checkpointing can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. @@ -310,10 +300,6 @@ give *exactly once* guarantees even in *at least once* mode. {% top %} -## End-to-end Exactly-Once Programs - -`TODO: add` - ## State and Fault Tolerance in Batch Programs Flink executes [batch programs](../dev/batch/index.html) as a special case of
[flink] 04/04: [FLINK-17982] Remove TODOs from timely-stream-processing.md
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit d939f3a85adc4b0a8e817c0d2a4dd4719fdf8349 Author: Aljoscha Krettek AuthorDate: Wed Jun 10 12:32:34 2020 +0200 [FLINK-17982] Remove TODOs from timely-stream-processing.md For some sections, we will expand them in the future when we have actual functionality. Some aimed at expanding sections but they are also ok as they are now. --- docs/concepts/timely-stream-processing.md | 6 -- 1 file changed, 6 deletions(-) diff --git a/docs/concepts/timely-stream-processing.md b/docs/concepts/timely-stream-processing.md index be8465f..9b96c9f 100644 --- a/docs/concepts/timely-stream-processing.md +++ b/docs/concepts/timely-stream-processing.md @@ -39,12 +39,6 @@ occured is important. In the following sections we will highlight some of the topics that you should consider when working with timely Flink Applications. -## Latency & Completeness - -`TODO: add these two sections` - -### Latency vs. Completeness in Batch & Stream Processing - {% top %} ## Notions of Time: Event Time and Processing Time
[flink] 03/04: [FLINK-16208] Add introduction to timely stream processing concepts documentation
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 973f24343911dab85456647804d3a7c15125dbf5 Author: Aljoscha Krettek AuthorDate: Tue Jun 9 17:14:19 2020 +0200 [FLINK-16208] Add introduction to timely stream processing concepts documentation --- docs/concepts/timely-stream-processing.md | 14 -- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/concepts/timely-stream-processing.md b/docs/concepts/timely-stream-processing.md index cb2c157..be8465f 100644 --- a/docs/concepts/timely-stream-processing.md +++ b/docs/concepts/timely-stream-processing.md @@ -24,11 +24,21 @@ specific language governing permissions and limitations under the License. --> -`TODO: add introduction` - * This will be replaced by the TOC {:toc} +## Introduction + +Timely steam processing is an extension of [stateful stream processing]({% link +concepts/stateful-stream-processing.md %}) in which time plays some role in the +computation. Among other things, this is the case when you do time series +analysis, when doing aggregations based on certain time periods (typically +called windows), or when you do event processing where the time when an event +occured is important. + +In the following sections we will highlight some of the topics that you should +consider when working with timely Flink Applications. + ## Latency & Completeness `TODO: add these two sections`
[flink] branch release-1.11 updated (e444d1f -> d939f3a)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from e444d1f [FLINK-18215][conf] Add log level to JavaBashUtils log4j config new 3cc3310 [FLINK-16213] Move stateful-stream-processing.md introduction to form "What is State" section new 0e080ac [FLINK-17982] Remove TODOs from stateful-stream-processing new 973f243 [FLINK-16208] Add introduction to timely stream processing concepts documentation new d939f3a [FLINK-17982] Remove TODOs from timely-stream-processing.md The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/concepts/stateful-stream-processing.md | 26 +- docs/concepts/timely-stream-processing.md | 14 +- 2 files changed, 14 insertions(+), 26 deletions(-)
[flink] branch master updated (2606323 -> 1a2fdbf)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2606323 [FLINK-18215][conf] Add log level to JavaBashUtils log4j config add 40080e0 [FLINK-16213] Move stateful-stream-processing.md introduction to form "What is State" section add 52145af [FLINK-17982] Remove TODOs from stateful-stream-processing add 3a36c76 [FLINK-16208] Add introduction to timely stream processing concepts documentation add 1a2fdbf [FLINK-17982] Remove TODOs from timely-stream-processing.md No new revisions were added by this update. Summary of changes: docs/concepts/stateful-stream-processing.md | 26 +- docs/concepts/timely-stream-processing.md | 14 +- 2 files changed, 14 insertions(+), 26 deletions(-)
[flink] 02/02: [FLINK-18215][conf] Add log level to JavaBashUtils log4j config
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit e444d1ff87148cc75231c79b6de39543844f6af8 Author: Chesnay Schepler AuthorDate: Tue Jun 9 15:35:12 2020 +0200 [FLINK-18215][conf] Add log level to JavaBashUtils log4j config --- flink-dist/src/main/resources/log4j-bash-utils.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dist/src/main/resources/log4j-bash-utils.properties b/flink-dist/src/main/resources/log4j-bash-utils.properties index 3aff31b..15a6e12 100644 --- a/flink-dist/src/main/resources/log4j-bash-utils.properties +++ b/flink-dist/src/main/resources/log4j-bash-utils.properties @@ -23,4 +23,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %x - %m%n +appender.console.layout.pattern = %-5p %x - %m%n
[flink] branch master updated (b2d1c2d -> 2606323)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b2d1c2d [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification add 2606323 [FLINK-18215][conf] Add log level to JavaBashUtils log4j config No new revisions were added by this update. Summary of changes: flink-dist/src/main/resources/log4j-bash-utils.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] 01/02: [FLINK-18018][dist] Bundle GPU plugin in plugins/ directory
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit cafe5e6edca1391ad126c82598a26a7a777e4200 Author: Yangze Guo AuthorDate: Tue Jun 9 21:50:46 2020 +0800 [FLINK-18018][dist] Bundle GPU plugin in plugins/ directory --- flink-dist/src/main/assemblies/opt.xml | 22 -- flink-dist/src/main/assemblies/plugins.xml | 22 ++ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index aa2327c..ed263a36 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -75,28 +75,6 @@ 0644 - - - ../flink-external-resources/flink-external-resource-gpu/target/flink-external-resource-gpu-${project.version}.jar - opt/external-resource-gpu/ - flink-external-resource-gpu-${project.version}.jar - 0644 - - - - ../flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh - opt/external-resource-gpu/ - gpu-discovery-common.sh - 0755 - - - - ../flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh - opt/external-resource-gpu/ - nvidia-gpu-discovery.sh - 0755 - - ../flink-filesystems/flink-s3-fs-hadoop/target/flink-s3-fs-hadoop-${project.version}.jar opt/ diff --git a/flink-dist/src/main/assemblies/plugins.xml b/flink-dist/src/main/assemblies/plugins.xml index e2824c4..b6008fe 100644 --- a/flink-dist/src/main/assemblies/plugins.xml +++ b/flink-dist/src/main/assemblies/plugins.xml @@ -79,6 +79,28 @@ flink-metrics-slf4j-${project.version}.jar 0644 + + + + ../flink-external-resources/flink-external-resource-gpu/target/flink-external-resource-gpu-${project.version}.jar + plugins/external_resource_gpu/ + flink-external-resource-gpu-${project.version}.jar + 0644 + + + + ../flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh + plugins/external_resource_gpu/ + gpu-discovery-common.sh + 0755 + + + + ../flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh + plugins/external_resource_gpu/ + nvidia-gpu-discovery.sh + 0755 +
[flink] branch release-1.11 updated (d36a2b3 -> e444d1f)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from d36a2b3 [FLINK-17944][sql-client] Wrong output in SQL Client's table mode new cafe5e6e [FLINK-18018][dist] Bundle GPU plugin in plugins/ directory new e444d1f [FLINK-18215][conf] Add log level to JavaBashUtils log4j config The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: flink-dist/src/main/assemblies/opt.xml | 22 -- flink-dist/src/main/assemblies/plugins.xml | 22 ++ .../src/main/resources/log4j-bash-utils.properties | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-)
[flink] branch master updated (b2d1c2d -> 2606323)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b2d1c2d [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification add 2606323 [FLINK-18215][conf] Add log level to JavaBashUtils log4j config No new revisions were added by this update. Summary of changes: flink-dist/src/main/resources/log4j-bash-utils.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (7143e6a -> b2d1c2d)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 7143e6a [FLINK-17944][sql-client] Wrong output in SQL Client's table mode add 1cb9d54 [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages add 82a13db [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable add 73afe2b [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." add 91df1a5 [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed add 2b51cda [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS add 64ff676 [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator add 26762bd1 [FLINK-17869][tests] Unignore UnalignedCheckpointITCase add b2d1c2d [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification No new revisions were added by this update. Summary of changes: .../checkpoint/channel/ChannelStateWriter.java | 29 +--- .../checkpoint/channel/ChannelStateWriterImpl.java | 25 -- .../channel/ChannelStateWriterImplTest.java| 49 +--- .../checkpoint/channel/MockChannelStateWriter.java | 8 +--- .../channel/RecordingChannelStateWriter.java | 5 -- .../runtime/state/ChannelPersistenceITCase.java| 2 +- .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++-- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 --- .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +- .../runtime/tasks/LocalStateForwardingTest.java| 2 - .../tasks/TestSubtaskCheckpointCoordinator.java| 2 +- .../checkpointing/UnalignedCheckpointITCase.java | 2 - 12 files changed, 95 insertions(+), 120 deletions(-)
[flink] branch release-1.11 updated: [FLINK-17944][sql-client] Wrong output in SQL Client's table mode
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new d36a2b3 [FLINK-17944][sql-client] Wrong output in SQL Client's table mode d36a2b3 is described below commit d36a2b3f02e9e31ab61d296ae76c3cffd48b4569 Author: Jeff Zhang AuthorDate: Tue May 26 22:01:39 2020 +0800 [FLINK-17944][sql-client] Wrong output in SQL Client's table mode This is a temporary workaround until we don't use Tuple2 to represent changelogs anymore. This closes #12346. --- .../local/result/MaterializedCollectStreamResult.java | 5 + .../local/result/MaterializedCollectStreamResultTest.java | 15 --- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java index 40dd676..2aec400 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.net.InetAddress; import java.util.ArrayList; @@ -186,6 +187,10 @@ public class MaterializedCollectStreamResult extends CollectStreamResult i @Override protected void processRecord(Tuple2 change) { synchronized (resultLock) { + // Always set the RowKind to INSERT, so that we can compare rows correctly (RowKind will be ignored), + // just use the Boolean of Tuple2 to figure out whether it is insert or delete. + change.f1.setKind(RowKind.INSERT); + // insert if (change.f0) { processInsert(change.f1); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java index 0abd0d5..e31372d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.junit.Test; @@ -57,10 +58,10 @@ public class MaterializedCollectStreamResultTest { result.isRetrieving = true; - result.processRecord(Tuple2.of(true, Row.of("A", 1))); - result.processRecord(Tuple2.of(true, Row.of("B", 1))); - result.processRecord(Tuple2.of(true, Row.of("A", 1))); - result.processRecord(Tuple2.of(true, Row.of("C", 2))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "B", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "C", 2))); assertEquals(TypedResult.payload(4), result.snapshot(1)); @@ -69,7 +70,7 @@ public class MaterializedCollectStreamResultTest { assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3)); assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4)); - result.processRecord(Tuple2.of(false, Row.of("A", 1))); + result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1))); assertEquals(TypedResult.payload(3), result.snapshot(1)); @@ -77,8 +78,8 @@ public class MaterializedCollectStreamResultTest { assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
[flink] branch master updated (1839fa5 -> 7143e6a)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1839fa5 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout add 7143e6a [FLINK-17944][sql-client] Wrong output in SQL Client's table mode No new revisions were added by this update. Summary of changes: .../local/result/MaterializedCollectStreamResult.java | 5 + .../local/result/MaterializedCollectStreamResultTest.java | 15 --- 2 files changed, 13 insertions(+), 7 deletions(-)
[flink] branch master updated (1839fa5 -> 7143e6a)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1839fa5 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout add 7143e6a [FLINK-17944][sql-client] Wrong output in SQL Client's table mode No new revisions were added by this update. Summary of changes: .../local/result/MaterializedCollectStreamResult.java | 5 + .../local/result/MaterializedCollectStreamResultTest.java | 15 --- 2 files changed, 13 insertions(+), 7 deletions(-)
[flink] branch release-1.11 updated: [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new b1f32c6 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout b1f32c6 is described below commit b1f32c6efd732787e27f37fbe18fe88f9e38f7c2 Author: Till Rohrmann AuthorDate: Mon Jun 8 16:05:51 2020 +0200 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of 200s as the rpc timeout. This closes #12531. --- .../apache/flink/test/cancelling/CancelingTestBase.java | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index b3905c0..03bc6eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger { protected static final int PARALLELISM = 4; - protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds + private static final Configuration configuration = getConfiguration(); // @ClassRule public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getConfiguration()) + .setConfiguration(configuration) .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(4) .build()); @@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger { // submit job final JobGraph jobGraph = getJobGraph(plan); + final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds(); + ClusterClient client = CLUSTER.getClusterClient(); JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph); Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { Thread.sleep(50); - jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatus != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); @@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger { Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow(); - JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) { Thread.sleep(50); - jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatusAfterCancel != JobStatus.CANCELED) {
[flink] branch release-1.11 updated: [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new b1f32c6 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout b1f32c6 is described below commit b1f32c6efd732787e27f37fbe18fe88f9e38f7c2 Author: Till Rohrmann AuthorDate: Mon Jun 8 16:05:51 2020 +0200 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of 200s as the rpc timeout. This closes #12531. --- .../apache/flink/test/cancelling/CancelingTestBase.java | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index b3905c0..03bc6eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger { protected static final int PARALLELISM = 4; - protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds + private static final Configuration configuration = getConfiguration(); // @ClassRule public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getConfiguration()) + .setConfiguration(configuration) .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(4) .build()); @@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger { // submit job final JobGraph jobGraph = getJobGraph(plan); + final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds(); + ClusterClient client = CLUSTER.getClusterClient(); JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph); Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { Thread.sleep(50); - jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatus != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); @@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger { Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow(); - JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) { Thread.sleep(50); - jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatusAfterCancel != JobStatus.CANCELED) {
[flink] branch master updated (20e82af -> 1839fa5)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 20e82af [FLINK-16198] Fix FileUtilsTest on macOS add 1839fa5 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout No new revisions were added by this update. Summary of changes: .../apache/flink/test/cancelling/CancelingTestBase.java | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-)
[flink] branch master updated: [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1839fa5 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout 1839fa5 is described below commit 1839fa57a91723f8ef10bcbd2c271366b5509b0b Author: Till Rohrmann AuthorDate: Mon Jun 8 16:05:51 2020 +0200 [FLINK-17498][tests] Increase CancelingTestBase rpc timeout to configured Akka ask timeout This commit hardens all CancelingTestBase tests by using the configured Akka ask timeout of 200s as the rpc timeout. This closes #12531. --- .../apache/flink/test/cancelling/CancelingTestBase.java | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index b3905c0..03bc6eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -56,14 +57,14 @@ public abstract class CancelingTestBase extends TestLogger { protected static final int PARALLELISM = 4; - protected static final long GET_FUTURE_TIMEOUT = 1000; // 1000 milliseconds + private static final Configuration configuration = getConfiguration(); // @ClassRule public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getConfiguration()) + .setConfiguration(configuration) .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(4) .build()); @@ -93,15 +94,17 @@ public abstract class CancelingTestBase extends TestLogger { // submit job final JobGraph jobGraph = getJobGraph(plan); + final long rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration).toMilliseconds(); + ClusterClient client = CLUSTER.getClusterClient(); JobSubmissionResult jobSubmissionResult = ClientUtils.submitJob(client, jobGraph); Deadline submissionDeadLine = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { Thread.sleep(50); - jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatus != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); @@ -113,10 +116,10 @@ public abstract class CancelingTestBase extends TestLogger { Deadline cancelDeadline = new FiniteDuration(maxTimeTillCanceled, TimeUnit.MILLISECONDS).fromNow(); - JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + JobStatus jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); while (jobStatusAfterCancel != JobStatus.CANCELED && cancelDeadline.hasTimeLeft()) { Thread.sleep(50); - jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(GET_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + jobStatusAfterCancel = client.getJobStatus(jobSubmissionResult.getJobID()).get(rpcTimeout, TimeUnit.MILLISECONDS); } if (jobStatusAfterCancel != JobStatus.CANCELED) {
[flink] branch master updated (f819e08 -> 20e82af)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f819e08 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes add 20e82af [FLINK-16198] Fix FileUtilsTest on macOS No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/util/FileUtils.java | 91 ++ .../java/org/apache/flink/util/FileUtilsTest.java | 28 +++ 2 files changed, 74 insertions(+), 45 deletions(-)
[flink] branch release-1.10 updated: [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 6de64ac [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes 6de64ac is described below commit 6de64ac202d1f0ac97df567208764e1547540c6e Author: godfreyhe AuthorDate: Fri Mar 13 13:38:03 2020 +0800 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes This closes #11397. --- .../org/apache/flink/table/client/cli/CliClient.java | 14 -- .../table/client/gateway/local/LocalExecutor.java | 9 ++--- .../apache/flink/table/client/cli/CliClientTest.java | 18 +++--- .../flink/table/client/cli/utils/TerminalUtils.java| 5 - 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 797bca6..a0ec166 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -340,7 +340,12 @@ public class CliClient { } private void callReset() { - executor.resetSessionProperties(sessionId); + try { + executor.resetSessionProperties(sessionId); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } printInfo(CliStrings.MESSAGE_RESET); } @@ -367,7 +372,12 @@ public class CliClient { } // set a property else { - executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1]); + try { + executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); } terminal.flush(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3665735..b59c6bf 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -63,8 +63,6 @@ import org.apache.flink.types.Row; import org.apache.flink.util.JarUtils; import org.apache.flink.util.StringUtils; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; - import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,7 +280,12 @@ public class LocalExecutor implements Executor { public void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException { ExecutionContext context = getExecutionContext(sessionId); Environment env = context.getEnvironment(); - Environment newEnv = Environment.enrich(env, ImmutableMap.of(key, value), ImmutableMap.of()); + Environment newEnv; + try { + newEnv = Environment.enrich(env, Collections.singletonMap(key, value), Collections.emptyMap()); + } catch (Throwable t) { + throw new SqlExecutionException("Could not set session property.", t); + } // Renew the ExecutionContext by new environment. // Book keep all the session states of current ExecutionContext then diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 3016724..aa2c3a1 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.client.cli; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.cli.utils.TerminalUtils; +import
[flink] branch release-1.11 updated: [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new cffbbf4 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes cffbbf4 is described below commit cffbbf4cb2c0f9dbd78642870c71391bc9afdb2f Author: godfreyhe AuthorDate: Fri Mar 13 13:38:03 2020 +0800 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes This closes #11397. --- .../apache/flink/table/client/cli/CliClient.java | 14 +- .../table/client/gateway/local/LocalExecutor.java | 9 ++-- .../flink/table/client/cli/CliClientTest.java | 51 -- .../flink/table/client/cli/TestingExecutor.java| 34 --- .../table/client/cli/TestingExecutorBuilder.java | 18 +++- .../table/client/cli/utils/TerminalUtils.java | 5 ++- 6 files changed, 96 insertions(+), 35 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 93b02c5..b93098b 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -375,7 +375,12 @@ public class CliClient { } private void callReset() { - executor.resetSessionProperties(sessionId); + try { + executor.resetSessionProperties(sessionId); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } printInfo(CliStrings.MESSAGE_RESET); } @@ -402,7 +407,12 @@ public class CliClient { } // set a property else { - executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + try { + executor.setSessionProperty(sessionId, cmdCall.operands[0], cmdCall.operands[1].trim()); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi()); } terminal.flush(); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 5caa2a9..665e85d 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -66,8 +66,6 @@ import org.apache.flink.types.Row; import org.apache.flink.util.JarUtils; import org.apache.flink.util.StringUtils; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; - import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,7 +286,12 @@ public class LocalExecutor implements Executor { public void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException { ExecutionContext context = getExecutionContext(sessionId); Environment env = context.getEnvironment(); - Environment newEnv = Environment.enrich(env, ImmutableMap.of(key, value), ImmutableMap.of()); + Environment newEnv; + try { + newEnv = Environment.enrich(env, Collections.singletonMap(key, value), Collections.emptyMap()); + } catch (Throwable t) { + throw new SqlExecutionException("Could not set session property.", t); + } // Renew the ExecutionContext by new environment. // Book keep all the session states of current ExecutionContext then diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index 351852b..38f554f 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import
[flink] branch master updated (162994f -> f819e08)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 162994f [hotfix][doc] Some minor languange clean-ups for the Source doc. add f819e08 [FLINK-16217][sql-client] Fix exception catching to avoid SQL client crashes No new revisions were added by this update. Summary of changes: .../apache/flink/table/client/cli/CliClient.java | 14 +- .../table/client/gateway/local/LocalExecutor.java | 9 ++-- .../flink/table/client/cli/CliClientTest.java | 51 -- .../flink/table/client/cli/TestingExecutor.java| 34 --- .../table/client/cli/TestingExecutorBuilder.java | 18 +++- .../table/client/cli/utils/TerminalUtils.java | 5 ++- 6 files changed, 96 insertions(+), 35 deletions(-)