morsapaes commented on a change in pull request #397:
URL: https://github.com/apache/flink-web/pull/397#discussion_r535296753



##########
File path: _posts/2020-12-04-release-1.12.0.md
##########
@@ -0,0 +1,313 @@
+---
+layout: post 
+title:  "Apache Flink 1.12.0 Release Announcement"
+date: 2020-12-04T08:00:00.000Z
+categories: news
+authors:
+- morsapaes:
+  name: "Marta Paes"
+  twitter: "morsapaes"
+- aljoscha:
+  name: "Aljoscha Krettek"
+  twitter: "aljoscha"
+
+excerpt: The Apache Flink community is excited to announce the release of 
Flink 1.12.0! Close to 300 contributors worked on over 1k tickets to bring 
significant improvements to usability as well as new features to Flink users 
across the whole API stack. We're particularly excited about adding efficient 
batch execution to the DataStream API, Kubernetes HA as an alternative to 
ZooKeeper, support for upsert mode in the Kafka SQL connector and the new 
Python DataStream API! Read on for all major new features and improvements, 
important changes to be aware of and what to expect moving forward!
+---
+
+The Apache Flink community is excited to announce the release of Flink 1.12.0! 
Close to 300 contributors worked on over 1k tickets to bring significant 
improvements to usability as well as new features that simplify (and unify) 
Flink handling across the API stack.
+
+**Release Highlights**
+
+* The community has added support for **efficient batch execution** in the 
DataStream API. This is the first milestone towards achieving a truly unified 
runtime for both batch and stream processing.
+
+* **Kubernetes-based High Availability (HA)** was implemented as an 
alternative to ZooKeeper for highly available production setups.
+
+* The Kafka SQL connector has been extended to work in **upsert mode**, 
supported by the ability to handle **connector metadata** in SQL DDL. 
**Temporal table joins** can now also be fully expressed in SQL, no longer 
depending on the Table API.
+
+* Support for the **DataStream API in PyFlink** expands its usage to more 
complex scenarios that require fine-grained control over state and time, and 
it’s now possible to deploy PyFlink jobs natively on **Kubernetes**.
+
+This blog post describes all major new features and improvements, important 
changes to be aware of and what to expect moving forward.
+
+{% toc %}
+
+The binary distribution and source artifacts are now available on the updated 
[Downloads page]({{ site.baseurl }}/downloads.html) of the Flink website, and 
the most recent distribution of PyFlink is available on 
[PyPI](https://pypi.org/project/apache-flink/). Please review the [release 
notes]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/release-notes/flink-1.12.html) carefully, and check 
the complete [release 
changelog](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12348263&styleName=Html&projectId=12315522)
 and [updated documentation]({{ site.DOCS_BASE_URL }}flink-docs-release-1.12/) 
for more details. 
+
+We encourage you to download the release and share your feedback with the 
community through the [Flink mailing 
lists](https://flink.apache.org/community.html#mailing-lists) or 
[JIRA](https://issues.apache.org/jira/projects/FLINK/summary).
+
+## New Features and Improvements
+
+### Batch Execution Mode in the DataStream API
+
+Flink’s core APIs have developed organically over the lifetime of the project, 
and were initially designed with specific use cases in mind. And while the 
Table API/SQL already has unified operators, using lower-level abstractions 
still requires you to choose between two semantically different APIs for batch 
(DataSet API) and streaming (DataStream API). Since _a batch is a subset of an 
unbounded stream_, there are some clear advantages to consolidating them under 
a single API:
+
+* **Reusability:** efficient batch and stream processing under the same API 
would allow you to easily switch between both execution modes without rewriting 
any code. So, a job could be easily reused to process real-time and historical 
data.
+
+* **Operational simplicity:** providing a unified API would mean using a 
single set of connectors, maintaining a single codebase and being able to 
easily implement mixed execution pipelines _e.g._ for use cases like 
backfilling.
+
+With these advantages in mind, the community has taken the first step towards 
the unification of the DataStream API: supporting efficient batch execution 
([FLIP-134](https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API)).
 This means that, in the long run, the DataSet API will be deprecated and 
subsumed by the DataStream API and the Table API/SQL 
([FLIP-131](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741)).
+
+**Batch for Bounded Streams**
+
+You could already use the DataStream API to process bounded streams (_e.g._ 
files), with the limitation that the runtime is not “aware” that the job is 
bounded. To optimize the runtime for bounded input, the new `BATCH` mode 
execution uses sort-based shuffles with aggregations purely in-memory and an 
improved scheduling strategy (_see [Pipelined Region 
Scheduling](#pipelined-region-scheduling-flip-119)_). As a result, `BATCH` mode 
execution in the DataStream API already comes very close to the performance of 
the DataSet API in Flink 1.12. For more details on the performance benchmark, 
check the original proposal 
([FLIP-140](https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams)).
+
+<center>
+  <figure>
+  <img src="{{ site.baseurl }}/img/blog/2020-12-04-release-1.12.0/1.png" 
width="600px"/>
+  </figure>
+</center>
+
+<div style="line-height:60%;">
+    <br>
+</div>
+
+In Flink 1.12, the default execution mode is `STREAMING`. To configure a job 
to run in `BATCH` mode, you can set the configuration when submitting a job: 
+
+```bash
+bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
+```
+
+, or do it programmatically:
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setRuntimeMode(RuntimeMode.BATCH);
+```
+
+<div style="line-height:150%;">
+    <br>
+</div>
+
+<span class="label label-info">Note</span> <font size="3">Although the DataSet 
API has not been deprecated yet, we recommend that users give preference to the 
DataStream API with <code>BATCH</code> execution mode for new batch jobs, and 
consider migrating existing DataSet jobs.</font>
+
+### New Data Sink API (Beta)
+
+Ensuring that connectors can work for both execution modes has already been 
covered for data sources in the [previous 
release](https://flink.apache.org/news/2020/07/06/release-1.11.0.html#new-data-source-api-beta),
 so in Flink 1.12 the community focused on implementing a unified Data Sink API 
([FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API)).
 The new abstraction introduces a write/commit protocol and a more modular 
interface where the individual components are transparently exposed to the 
framework. 
+
+A _Sink_ implementor will have to provide the **what** and **how**: a 
[_SinkWriter_]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/SinkWriter.html)
 that writes data and outputs what needs to be committed (i.e. committables); 
and a [_Committer_]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/Committer.html)
 and [_GlobalCommitter_]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/GlobalCommitter.html)
 that encapsulate how to handle the committables. The framework is responsible 
for the **when** and **where**: at what time and on which machine or process to 
commit.
+
+<center>
+  <figure>
+  <img src="{{ site.baseurl }}/img/blog/2020-12-04-release-1.12.0/2.png" 
width="700px"/>
+  </figure>
+</center>
+
+<div style="line-height:150%;">
+    <br>
+</div>
+
+This more modular abstraction allowed to support different runtime 
implementations for the `BATCH` and `STREAMING` execution modes that are 
efficient for their intended purpose, but use just one, unified sink 
implementation. In Flink 1.12, the [FileSink connector]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/connectors/file_sink.html) is the unified drop-in 
replacement for StreamingFileSink 
([FLINK-19758](https://issues.apache.org/jira/browse/FLINK-19758)). The 
remaining connectors will be ported to the new interfaces in future releases.
+
+### Kubernetes High Availability (HA) Service
+
+Kubernetes provides built-in functionalities that Flink can leverage for 
JobManager failover, instead of relying on 
[ZooKeeper](https://zookeeper.apache.org/). To enable a “ZooKeeperless” HA 
setup, the community implemented a Kubernetes HA service in Flink 1.12 
([FLIP-144](https://cwiki.apache.org/confluence/x/H0V4CQ)). The service is 
built on the same [base interface]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/api/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.html)
 as the ZooKeeper implementation and uses Kubernetes’ 
[ConfigMap](https://kubernetes.io/docs/concepts/configuration/configmap/) 
objects to handle all the metadata needed to recover from a JobManager failure. 
For more details and examples on how to configure a highly available Kubernetes 
cluster, check out the [documentation]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/deployment/ha/kubernetes_ha.html).
+
+<div class="alert alert-info small" markdown="1">
+<b>Note:</b> This does not mean that the ZooKeeper dependency will be dropped, 
just that there will be an alternative for users of Flink on Kubernetes.
+</div>
+
+<hr>
+
+### Other Improvements
+
+**Migration of existing connectors to the new Data Source API**
+
+The previous release introduced a new Data Source API 
([FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)),
 allowing to implement connectors that work both as bounded (batch) and 
unbounded (streaming) sources. In Flink 1.12, the community started porting 
existing source connectors to the new interfaces, starting with the FileSystem 
connector ([FLINK-19161](https://issues.apache.org/jira/browse/FLINK-19161)). 
+
+<div class="alert alert-danger small" markdown="1">
+<b>Attention:</b> The unified source implementations will be completely 
separate connectors that are not snapshot-compatible with their legacy 
counterparts.
+</div>
+
+**Pipelined Region Scheduling 
([FLIP-119](https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP119PipelinedRegionScheduling-BulkSlotAllocation))**
+
+Flink’s scheduler has been largely designed to address batch and streaming 
workloads separately. This release introduces a **unified** scheduling strategy 
that identifies sets of tasks connected via blocking data exchanges to break 
down the execution graph into _pipelined regions_. This allows to schedule each 
region only when there’s data to perform work and only deploy it once all the 
required resources are available; as well as to restart failed regions 
independently. In particular for batch jobs, the new strategy leads to more 
efficient resource utilization and eliminates deadlocks.
+
+**Support for Sort-Merge Blocking Shuffles 
([FLIP-148](https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink))**
+
+To improve the stability, performance and resource utilization of large-scale 
batch jobs, the community introduced sort-merge blocking shuffle as an 
alternative to the hash-based implementation that Flink already used. This 
approach can reduce shuffle time 
[significantly](https://www.mail-archive.com/dev@flink.apache.org/msg42472.html),
 and uses fewer file handles and file write buffers (which is problematic for 
large-scale jobs). Further optimizations will be implemented in upcoming 
releases ([FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)).
+
+<div class="alert alert-danger small" markdown="1">
+<b>Attention:</b> This feature is experimental and not enabled by default. To 
enable sort-merge shuffles, you can configure a reasonable minimum parallelism 
threshold in the <a 
href="https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-network-sort-shuffle-min-parallelism";>TaskManager
 network configuration options</a>.
+</div>
+
+**Improvements to the Flink WebUI 
([FLIP-75](https://cwiki.apache.org/confluence/display/FLINK/FLIP-75%3A+Flink+Web+UI+Improvement+Proposal))**
+
+As a continuation of the series of improvements to the Flink WebUI kicked off 
in the last release, the community worked on exposing JobManager's 
memory-related metrics and configuration parameters on the WebUI 
([FLIP-104](https://cwiki.apache.org/confluence/display/FLINK/FLIP-104%3A+Add+More+Metrics+to+Jobmanager)).
 The TaskManager's metrics page has also been updated to reflect the [changes 
to the TaskManager memory 
model](https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html)
 introduced in Flink 1.10 
([FLIP-102](https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager)),
 adding new metrics for Managed Memory, Network Memory and Metaspace.
+
+<hr>
+
+### Table API/SQL: Metadata Handling in SQL Connectors
+
+Some sources (and formats) expose additional fields as metadata that can be 
valuable for users to process along with record data. A common example is 
Kafka, where you might want to _e.g._ access offset, partition or topic 
information, read/write the record key or use embedded metadata timestamps for 
time-based operations.
+With the new release, Flink SQL supports **metadata columns** to read and 
write connector- and format-specific fields for every row of a table 
([FLIP-107](https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors)).
 These columns are declared in the `CREATE TABLE` statement using the 
`METADATA` (reserved) keyword.
+
+```sql
+CREATE TABLE kafka_table (
+  id BIGINT,
+  name STRING,
+  event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 
'timestamp' metadata
+  headers MAP<STRING, BYTES> METADATA  -- access Kafka 'headers' metadata
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'test-topic', 
+  'format' = 'avro'
+);
+```
+
+In Flink 1.12, metadata is exposed for the **Kafka** and **Kinesis** 
connectors, with work on the FileSystem connector already planned 
([FLINK-19903](https://issues.apache.org/jira/browse/FLINK-19903)). Due to the 
more complex structure of Kafka records, [new properties]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/kafka.html#key-format) were also 
specifically implemented for the Kafka connector to control how to handle the 
key/value pairs. For a complete overview of metadata support in Flink SQL, 
check the [documentation]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/) for each connector, as well as 
the motivating use cases in the original proposal.
+
+### Table API/SQL: Upsert Kafka Connector
+
+For some use cases, like interpreting compacted topics or writing out 
(updating) aggregated results, it’s necessary to handle Kafka record keys as 
_true_ primary keys that can determine what should be inserted, deleted or 
updated. To enable this, the community created a [dedicated upsert 
connector]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html) 
(`upsert-kafka`) that extends the base implementation to work in _upsert_ mode 
([FLIP-149](https://issues.apache.org/jira/browse/FLINK-19857)). 
+
+The new `upsert-kafka` connector can be used for sources and sinks, and 
provides the **same base functionality** and **persistence guarantees** as the 
existing Kafka connector, as it reuses most of its code under the hood. To use 
the `upsert-kafka connector`, you must define a [primary key constraint]({{ 
site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/sql/create.html#primary-key) on table 
creation, as well as specify the (de)serialization format for the key 
(`key.format`) and value (`value.format`).
+
+### Table API/SQL: Support for Temporal Table Joins in SQL
+
+Instead of creating a temporal table function to look up against a table at a 
certain point in time, you can now simply use the standard SQL clause `FOR 
SYSTEM_TIME AS OF` (SQL:2011) to express a **temporal table join**. In 
addition, temporal joins are now supported against _any_ kind of table that has 
a time attribute and a primary key, and not just _append-only_ tables. This 
unlocks a new set of use cases, like performing temporal joins directly against 
Kafka compacted topics or database changelogs (e.g. from Debezium).
+
+```sql
+
+-- Table backed by a Kafka topic
+CREATE TABLE orders (
+    order_id STRING,
+    currency STRING,
+    amount INT,
+    order_time TIMESTAMP(3),
+    WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
+) WITH (
+    'connector' = 'kafka',
+    ...
+);
+
+-- Table backed by a Kafka compacted topic
+CREATE TABLE latest_rates ( 
+    currency STRING,
+    currency_rate DECIMAL(38, 10),
+    currency_time TIMESTAMP(3),
+    WATERMARK FOR currency_time AS currency_time - INTERVAL '5' SECOND,
+    PRIMARY KEY (currency) NOT ENFORCED      
+) WITH (
+  'connector' = 'upsert-kafka',
+  ...
+);
+
+-- Event-time temporal table join
+SELECT 
+    o.order_id,
+    o.order_time, 
+    o.amount * r.currency_rate AS amount,
+    r.currency
+FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
+ON o.currency = r.currency;
+```
+
+The previous example also shows how you can take advantage of the new 
`upsert-kafka` connector in the context of temporal table joins.
+
+**Hive Tables in Temporal Table Joins**
+
+You can also perform temporal table joins against Hive tables by either 
automatically reading the latest table partition as a temporal table 
([FLINK-19644](https://issues.apache.org/jira/browse/FLINK-19644)) or the whole 
table as a bounded stream tracking the latest version at execution time. Refer 
to the [documentation]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#temporal-table-join)
 for examples of using Hive tables in temporal table joins.
+
+<hr>
+
+### Other Improvements to the Table API/SQL
+
+**Kinesis Flink SQL Connector 
([FLINK-18858](https://issues.apache.org/jira/browse/FLINK-18858))**
+
+From Flink 1.12, Amazon Kinesis Data Streams (KDS) is natively supported as a 
source/sink also in the Table API/SQL. The new Kinesis SQL connector ships with 
support for Enhanced Fan-Out (EFO) and Sink Partitioning. For a complete 
overview of supported features, configuration options and exposed metadata, 
check the [updated documentation]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/kinesis.html).
+
+**Streaming Sink Compaction in the FileSystem/Hive Connector 
([FLINK-19345](https://issues.apache.org/jira/browse/FLINK-19345))**
+
+Many bulk formats, such as Parquet, are most efficient when written as large 
files; this is a challenge when frequent checkpointing is enabled, as too many 
small files are created (and need to be rolled on checkpoint). In Flink 1.12, 
the file sink supports **file compaction**, allowing jobs to retain smaller 
checkpoint intervals without generating a large number of files. To enable file 
compaction, you can set `auto-compaction=true` in the properties of the 
FileSystem connector, as described in the [documentation]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction).
+
+**Watermark Pushdown in the Kafka Connector 
([FLINK-20041](https://issues.apache.org/jira/browse/FLINK-20041))**
+
+To ensure correctness when consuming from Kafka, it’s generally preferable to 
generate watermarks on a per-partition basis, since the out-of-orderness within 
a partition is usually lower than across all partitions. Flink will now push 
down watermark strategies to [emit **per-partition watermarks**]({{ 
site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/connectors/kafka.html#source-per-partition-watermarks)
 from within the Kafka consumer. The output watermark of the source will be 
determined by the minimum watermark across the partitions it reads, leading to 
better (i.e. closer to real-time) watermarking. Watermark pushdown also lets 
you configure per-partition **idleness detection** to prevent idle partitions 
from holding back the event time progress of the entire application.
+
+**Newly Supported Formats**
+
+<table class="table table-bordered" style="font-size:95%">
+  <thead>
+    <tr>
+      <th>Format</th>
+      <th>Description</th>
+      <th>Supported Connectors</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td><a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html";>Avro
 Schema Registry</a></td>
+      <td>Read and write data serialized with the Confluent Schema Registry <a 
href="https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html";>KafkaAvroSerializer</a>.</td>
+      <td>Kafka, Upsert Kafka</td>
+    </tr>
+    <tr>
+      <td><a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/debezium.html";>Debezium
 Avro</a></td>
+        <td>Read and write Debezium records serialized with the Confluent 
Schema Registry KafkaAvroSerializer.</td>
+        <td>Kafka</td>
+    </tr>
+      <td><a 
href="https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/raw.html";>Raw</a></td>
+      <td>Read and write raw (byte-based) values as a single column.</td>
+      <td>
+        <p>Kafka, Upsert Kafka</p>
+        <p>Kinesis</p>
+        <p>FileSystem</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+**Multi-input Operator for Join Optimization 
([FLINK-19621](https://issues.apache.org/jira/browse/FLINK-19621))**
+
+Shuffling is one of the most expensive operations in a Flink job. To eliminate 
unnecessary shuffles and improve the performance of batch and streaming Table 
API/SQL jobs, the default planner now leverages the N-ary stream operator 
introduced in the last release 
([FLIP-92](https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink))
 to implement the "chaining" of operators connected by forward edges and change 
these network shuffles to local function calls.
+
+**Type Inference for Table API UDAFs 
([FLIP-65](https://cwiki.apache.org/confluence/display/FLINK/FLIP-65%3A+New+type+inference+for+Table+API+UDFs))**
+
+This release concluded the work started in Flink 1.9 on a [new data type 
system]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/table/types.html#data-types) for the Table API, 
with the exposure of aggregate functions (UDAFs) to the new type system. From 
Flink 1.12, UDAFs behave similarly to scalar and table functions, and support 
all data types.
+
+<hr>
+
+### PyFlink: Python DataStream API
+
+To expand the usability of PyFlink, this release introduces a first version of 
the Python DataStream API 
([FLIP-130](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866298))
 with support for stateless operations (e.g. Map, FlatMap, Filter, KeyBy). To 
give the Python DataStream API a try, you can [install PyFlink]({{ 
site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/python/installation.html#installation-of-pyflink) 
and check out [this tutorial]({{ site.DOCS_BASE_URL 
}}flink-docs-release-1.12/dev/python/datastream_tutorial.html) that guides you 
through building a simple streaming application.

Review comment:
       This makes sense, yes — I'll push it in. Thanks, @WeiZhong94!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to