This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2662ce3  Add Pravega Flink connector 101 blog
2662ce3 is described below

commit 2662ce340fe962b460485b7e1e04540255d75950
Author: Brian Zhou <b.z...@dell.com>
AuthorDate: Wed Dec 1 15:33:26 2021 +0800

    Add Pravega Flink connector 101 blog
---
 _posts/2022-01-20-pravega-connector-101.md | 221 +++++++++++++++++++++++++++++
 1 file changed, 221 insertions(+)

diff --git a/_posts/2022-01-20-pravega-connector-101.md 
b/_posts/2022-01-20-pravega-connector-101.md
new file mode 100644
index 0000000..47e9789
--- /dev/null
+++ b/_posts/2022-01-20-pravega-connector-101.md
@@ -0,0 +1,221 @@
+---
+layout: post
+title: "Pravega Flink Connector 101"
+date: 2022-01-20 00:00:00
+authors:
+- brianzhou:
+  name: "Yumin Zhou (Brian)"
+  twitter: "crazy__zhou"
+excerpt: A brief introduction to the Pravega Flink Connector
+---
+
+[Pravega](https://cncf.pravega.io/), which is now a CNCF sandbox project, is a 
cloud-native storage system based on abstractions for both batch and streaming 
data consumption. Pravega streams (a new storage abstraction) are durable, 
consistent, and elastic, while natively supporting long-term data retention. In 
comparison, [Apache Flink](https://flink.apache.org/) is a popular real-time 
computing engine that provides unified batch and stream processing. Flink 
provides high-throughput, lo [...]
+
+That's also the main reason why Pravega has chosen to use Flink as the first 
integrated execution engine among the various distributed computing engines on 
the market. With the help of Flink, users can use flexible APIs for windowing, 
complex event processing (CEP), or table abstractions to process streaming data 
easily and enrich the data being stored. Since its inception in 2016, Pravega 
has established communication with Flink PMC members and developed the 
connector together.
+
+In 2017, the Pravega Flink connector module started to move out of the Pravega 
main repository and has been maintained in a new separate 
[repository](https://github.com/pravega/flink-connectors) since then. During 
years of development, many features have been implemented, including: 
+
+- exactly-once processing guarantees for both Reader and Writer, supporting 
end-to-end exactly-once processing pipelines
+- seamless integration with Flink's checkpoints and savepoints
+- parallel Readers and Writers supporting high throughput and low latency 
processing
+- support for Batch, Streaming, and Table API to access Pravega Streams
+
+These key features make streaming pipeline applications easier to develop 
without worrying about performance and correctness which are the common pain 
points for many streaming use cases. 
+
+In this blog post, we will discuss how to use this connector to read and write 
Pravega streams with the Flink DataStream API.  
+
+{% toc %}
+
+# Basic usages
+
+## Dependency
+To use this connector in your application, add the dependency to your project:
+
+```xml
+<dependency>
+  <groupId>io.pravega</groupId>
+  <artifactId>pravega-connectors-flink-1.13_2.12</artifactId>
+  <version>0.10.1</version>
+</dependency>
+```
+
+
+In the above example, 
+
+`1.13` is the Flink major version which is put in the middle of the artifact 
name. The Pravega Flink connector maintains compatibility for the *three* most 
recent major versions of Flink.
+
+`0.10.1` is the version that aligns with the Pravega version.
+
+You can find the latest release with a support matrix on the [GitHub Releases 
page](https://github.com/pravega/flink-connectors/releases).
+
+## API introduction
+
+### Configurations
+
+The connector provides a common top-level object `PravegaConfig` for Pravega 
connection configurations. The config object automatically configures itself 
from _environment variables_, _system properties_ and _program arguments_.
+
+The basic controller URI and the default scope can be set like this:
+
+|Setting|Environment Variable /<br/>System Property /<br/>Program 
Argument|Default Value|
+|-------|-------------------------------------------------------------|-------------|
+|Controller 
URI|`PRAVEGA_CONTROLLER_URI`<br/>`pravega.controller.uri`<br/>`--controller`|`tcp://localhost:9090`|
+|Default Scope|`PRAVEGA_SCOPE`<br/>`pravega.scope`<br/>`--scope`|-|
+
+The recommended way to create an instance of `PravegaConfig` is the following:
+
+
+```java
+// From default environment
+PravegaConfig config = PravegaConfig.fromDefaults();
+
+// From program arguments
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// From user specification
+PravegaConfig config = PravegaConfig.fromDefaults()
+    .withControllerURI("tcp://...")
+    .withDefaultScope("SCOPE-NAME")
+    .withCredentials(credentials)
+    .withHostnameValidation(false);
+```
+
+### Serialization/Deserialization
+
+Pravega has defined 
[`io.pravega.client.stream.Serializer`](http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html)
 for the serialization/deserialization, while Flink has also defined standard 
interfaces for the purpose.
+
+- 
[`org.apache.flink.api.common.serialization.SerializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html)
+- 
[`org.apache.flink.api.common.serialization.DeserializationSchema`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html)
+
+For interoperability with other pravega client applications, we have built-in 
adapters `PravegaSerializationSchema` and `PravegaDeserializationSchema` to 
support processing Pravega stream data produced by a non-Flink application.
+
+Here is the adapter for Pravega Java serializer:
+
+```java
+import io.pravega.client.stream.impl.JavaSerializer;
+...
+DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
+    MyEvent.class, new JavaSerializer<MyEvent>());
+```
+
+### `FlinkPravegaReader`
+
+`FlinkPravegaReader` is a Flink `SourceFunction` implementation which supports 
parallel reads from one or more Pravega streams. Internally, it initiates a 
Pravega reader group and creates Pravega `EventStreamReader` instances to read 
the data from the stream(s). It provides a builder-style API to construct, and 
can allow streamcuts to mark the start and end of the read.
+
+You can use it like this:
+
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Enable Flink checkpoint to make state fault tolerant
+env.enableCheckpointing(60000);
+
+// Define the Pravega configuration
+ParameterTool params = ParameterTool.fromArgs(args);
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event deserializer
+DeserializationSchema<MyClass> deserializer = ...
+
+// Define the data stream
+FlinkPravegaReader<MyClass> pravegaSource = 
FlinkPravegaReader.<MyClass>builder()
+    .forStream(...)
+    .withPravegaConfig(config)
+    .withDeserializationSchema(deserializer)
+    .build();
+DataStream<MyClass> stream = env.addSource(pravegaSource)
+    .setParallelism(4)
+    .uid("pravega-source");
+```
+
+### `FlinkPravegaWriter`
+
+`FlinkPravegaWriter` is a Flink `SinkFunction` implementation which supports 
parallel writes to Pravega streams.
+
+It supports three writer modes that relate to guarantees about the persistence 
of events emitted by the sink to a Pravega Stream:
+
+1. **Best-effort** - Any write failures will be ignored and there could be 
data loss.
+2. **At-least-once**(default) - All events are persisted in Pravega. Duplicate 
events are possible, due to retries or in case of failure and subsequent 
recovery.
+3. **Exactly-once** - All events are persisted in Pravega using a 
transactional approach integrated with the Flink checkpointing feature.
+
+Internally, it will initiate several Pravega `EventStreamWriter` or 
`TransactionalEventStreamWriter` (depends on the writer mode) instances to 
write data to the stream. It provides a builder-style API to construct.
+
+A basic usage looks like this:
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Define the Pravega configuration
+PravegaConfig config = PravegaConfig.fromParams(params);
+
+// Define the event serializer
+SerializationSchema<MyClass> serializer = ...
+
+// Define the event router for selecting the Routing Key
+PravegaEventRouter<MyClass> router = ...
+
+// Define the sink function
+FlinkPravegaWriter<MyClass> pravegaSink = FlinkPravegaWriter.<MyClass>builder()
+   .forStream(...)
+   .withPravegaConfig(config)
+   .withSerializationSchema(serializer)
+   .withEventRouter(router)
+   .withWriterMode(EXACTLY_ONCE)
+   .build();
+
+DataStream<MyClass> stream = ...
+stream.addSink(pravegaSink)
+    .setParallelism(4)
+    .uid("pravega-sink");
+```
+
+You can see some more examples 
[here](https://github.com/pravega/pravega-samples).
+
+# Internals of reader and writer
+
+## Checkpoint integration
+
+Flink has periodic checkpoints based on the Chandy-Lamport algorithm to make 
state in Flink fault-tolerant. By allowing state and the corresponding stream 
positions to be recovered, the application is given the same semantics as a 
failure-free execution.
+
+Pravega also has its own Checkpoint concept which is to create a consistent 
"point in time" persistence of the state of each Reader in the Reader Group, by 
using a specialized Event (*Checkpoint Event*) to signal each Reader to 
preserve its state. Once a Checkpoint has been completed, the application can 
use the Checkpoint to reset all the Readers in the Reader Group to the known 
consistent state represented by the Checkpoint.
+
+This means that our end-to-end recovery story is not like other messaging 
systems such as Kafka, which uses a more coupled method and persists its offset 
in the Flink task state and lets Flink do the coordination. Flink delegates the 
Pravega source recovery completely to the Pravega server and uses only a 
lightweight hook to connect. We collaborated with the Flink community and added 
a new interface `ExternallyInducedSource` 
([FLINK-6390](https://issues.apache.org/jira/browse/FLINK-6390) [...]
+
+The checkpoint mechanism works as a two-step process:
+
+   - The [master 
hook](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html)
 handler from the JobManager initiates the 
[`triggerCheckpoint`](https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-)
 request to the `ReaderCheckpointHook` that was registered with the JobManager 
during [...]
+
+   - A `Checkpoint` event will be sent by Pravega as part of the data stream 
flow and, upon receiving the event, the `FlinkPravegaReader` will initiate a 
[`triggerCheckpoint`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73)
 request to effectively let Flink continue and complete the checkpoint process.
+
+
+## End-to-end exactly-once semantics
+
+In the early years of big data processing, results from real-time stream 
processing were always considered inaccurate/approximate/speculative. However, 
this correctness is extremely important for some use cases and in some 
industries such as finance. 
+
+This constraint stems mainly from two issues:
+
+- unordered data source in event time 
+- end-to-end exactly-once semantics guarantee
+
+During recent years of development, watermarking has been introduced as a 
tradeoff between correctness and latency, which is now considered a good 
solution for unordered data sources in event time.
+
+The guarantee of end-to-end exactly-once semantics is more tricky. When we say 
“exactly-once semantics”, what we mean is that each incoming event affects the 
final results exactly once. Even in the event of a machine or software failure, 
there is no duplicate data and no data that goes unprocessed. This is quite 
difficult because of the demands of message acknowledgment and recovery during 
such fast processing and is also why some early distributed streaming engines 
like Storm(without Tr [...]
+
+Flink is one of the first streaming systems that was able to provide 
exactly-once semantics due to its delicate [checkpoint 
mechanism](https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink).
 But to make it work end-to-end, the final stage needs to apply the semantic to 
external message system sinks that support commits and rollbacks.
+
+To work around this problem, Pravega introduced [transactional 
writes](https://cncf.pravega.io/docs/latest/transactions/). A Pravega 
transaction allows an application to prepare a set of events that can be 
written "all at once" to a Stream. This allows an application to "commit" a 
bunch of events atomically. When writes are idempotent, it is possible to 
implement end-to-end exactly-once pipelines together with Flink.
+
+To build such an end-to-end solution requires coordination between Flink and 
the Pravega sink, which is still challenging. A common approach for 
coordinating commits and rollbacks in a distributed system is the two-phase 
commit protocol. We used this protocol and, together with the Flink community, 
implemented the sink function in a two-phase commit way coordinated with Flink 
checkpoints.
+
+The Flink community then extracted the common logic from the two-phase commit 
protocol and provided a general interface `TwoPhaseCommitSinkFunction` 
([FLINK-7210](https://issues.apache.org/jira/browse/FLINK-7210)) to make it 
possible to build end-to-end exactly-once applications with other message 
systems that have transaction support. This includes Apache Kafka versions 0.11 
and above. There is an official Flink [blog 
post](https://flink.apache.org/features/2018/03/01/end-to-end-exactly [...]
+
+# Summary
+The Pravega Flink connector enables Pravega to connect to Flink and allows 
Pravega to act as a key data store in a streaming pipeline. Both projects share 
a common design philosophy and can integrate well with each other. Pravega has 
its own concept of checkpointing and has implemented transactional writes to 
support end-to-end exactly-once guarantees.
+
+# Future plans
+
+`FlinkPravegaInputFormat` and `FlinkPravegaOutputFormat` are now provided to 
support batch reads and writes in Flink, but these are under the legacy DataSet 
API. Since Flink is now making efforts to unify batch and streaming, it is 
improving its APIs and providing new interfaces for the 
[source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 and 
[sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API)
 APIs in the Flin [...]
+ 
+We will also put more effort into SQL / Table API support in order to provide 
a better user experience since it is simpler to understand and even more 
powerful to use in some cases.
+
+**Note:** the original blog post can be found 
[here](https://cncf.pravega.io/blog/2021/11/01/pravega-flink-connector-101/).
\ No newline at end of file

Reply via email to