This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c81b831 [hotfix][docs] Adding new sources on overview pages c81b831 is described below commit c81b831d5fe08d328251d91f4f255b1508a9feb4 Author: martijnvisser <mart...@2symbols.com> AuthorDate: Mon Sep 13 16:32:42 2021 +0200 [hotfix][docs] Adding new sources on overview pages This closes #17266 --- .../docs/connectors/datastream/cassandra.md | 2 +- .../docs/connectors/datastream/guarantees.md | 2 +- .../docs/connectors/datastream/hybridsource.md | 100 +++++++++++++++++++++ .../content.zh/docs/connectors/datastream/kafka.md | 2 +- .../docs/connectors/datastream/kinesis.md | 2 +- .../docs/connectors/datastream/overview.md | 7 +- .../docs/connectors/datastream/streamfile_sink.md | 7 +- .../docs/connectors/datastream/cassandra.md | 2 +- docs/content/docs/connectors/datastream/kafka.md | 2 +- docs/content/docs/connectors/datastream/kinesis.md | 2 +- .../content/docs/connectors/datastream/overview.md | 4 +- 11 files changed, 119 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/cassandra.md b/docs/content.zh/docs/connectors/datastream/cassandra.md index b2b18d5..bddb9f2 100644 --- a/docs/content.zh/docs/connectors/datastream/cassandra.md +++ b/docs/content.zh/docs/connectors/datastream/cassandra.md @@ -1,6 +1,6 @@ --- title: Cassandra -weight: 3 +weight: 4 type: docs aliases: - /zh/dev/connectors/cassandra.html diff --git a/docs/content.zh/docs/connectors/datastream/guarantees.md b/docs/content.zh/docs/connectors/datastream/guarantees.md index 1f60466..e383367 100644 --- a/docs/content.zh/docs/connectors/datastream/guarantees.md +++ b/docs/content.zh/docs/connectors/datastream/guarantees.md @@ -1,6 +1,6 @@ --- title: 容错保证 -weight: 1 +weight: 2 type: docs aliases: - /zh/dev/connectors/guarantees.html diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md b/docs/content.zh/docs/connectors/datastream/hybridsource.md new file mode 100644 index 0000000..02f5077 --- /dev/null +++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md @@ -0,0 +1,100 @@ +--- +title: Hybrid Source +weight: 8 +type: docs +aliases: +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Hybrid Source + +`HybridSource` is a source that contains a list of concrete [sources]({{< ref "docs/dev/datastream/sources" >}}). +It solves the problem of sequentially reading input from heterogeneous sources to produce a single input stream. + +For example, a bootstrap use case may need to read several days worth of bounded input from S3 before continuing with the latest unbounded input from Kafka. +`HybridSource` switches from `FileSource` to `KafkaSource` when the bounded file input finishes without interrupting the application. + +Prior to `HybridSource`, it was necessary to create a topology with multiple sources and define a switching mechanism in user land, which leads to operational complexity and inefficiency. + +With `HybridSource` the multiple sources appear as a single source in the Flink job graph and from `DataStream` API perspective. + +For more background see [FLIP-150](https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source) + +To use the connector, add the ```flink-connector-base``` dependency to your project: + +{{< artifact flink-connector-base >}} + +(Typically comes as transitive dependency with concrete sources.) + +## Start position for next source + +To arrange multiple sources in a `HybridSource`, all sources except the last one need to be bounded. Therefore, the sources typically need to be assigned a start and end position. The last source may be bounded in which case the `HybridSource` is bounded and unbounded otherwise. +Details depend on the specific source and the external storage systems. + +Here we cover the most basic and then a more complex scenario, following the File/Kafka example. + +#### Fixed start position at graph construction time + +Example: Read till pre-determined switch time from files and then continue reading from Kafka. +Each source covers an upfront known range and therefore the contained sources can be created upfront as if they were used directly: + +```java +long switchTimestamp = ...; // derive from file input paths +FileSource<String> fileSource = + FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); +KafkaSource<String> kafkaSource = + KafkaSource.<String>builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); +HybridSource<String> hybridSource = + HybridSource.builder(fileSource) + .addSource(kafkaSource) + .build(); +``` + +#### Dynamic start position at switch time + +Example: File source reads a very large backlog, taking potentially longer than retention available for next source. +Switch needs to occur at "current time - X". This requires the start time for the next source to be set at switch time. +Here we require transfer of end position from the previous file enumerator for deferred construction of `KafkaSource` +by implementing `SourceFactory`. + +Note that enumerators need to support getting the end timestamp. This may currently require a source customization. +Adding support for dynamic end position to `FileSource` is tracked in [FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633). + +```java +FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest(); +HybridSource<String> hybridSource = + HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource) + .addSource( + switchContext -> { + CustomFileSplitEnumerator previousEnumerator = + switchContext.getPreviousEnumerator(); + // how to get timestamp depends on specific enumerator + long switchTimestamp = previousEnumerator.getEndTimestamp(); + KafkaSource<String> kafkaSource = + KafkaSource.<String>builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); + return kafkaSource; + }, + Boundedness.CONTINUOUS_UNBOUNDED) + .build(); +``` diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md index 8ab878c..acaf601 100644 --- a/docs/content.zh/docs/connectors/datastream/kafka.md +++ b/docs/content.zh/docs/connectors/datastream/kafka.md @@ -1,6 +1,6 @@ --- title: Kafka -weight: 2 +weight: 3 type: docs aliases: - /zh/dev/connectors/kafka.html diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md b/docs/content.zh/docs/connectors/datastream/kinesis.md index 50e09d5..93eb6f1 100644 --- a/docs/content.zh/docs/connectors/datastream/kinesis.md +++ b/docs/content.zh/docs/connectors/datastream/kinesis.md @@ -1,6 +1,6 @@ --- title: Kinesis -weight: 4 +weight: 5 type: docs aliases: - /zh/dev/connectors/kinesis.html diff --git a/docs/content.zh/docs/connectors/datastream/overview.md b/docs/content.zh/docs/connectors/datastream/overview.md index 5a580d9..7f3de30 100644 --- a/docs/content.zh/docs/connectors/datastream/overview.md +++ b/docs/content.zh/docs/connectors/datastream/overview.md @@ -41,12 +41,13 @@ under the License. * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) (sink) * [Amazon Kinesis Streams]({{< ref "docs/connectors/datastream/kinesis" >}}) (source/sink) * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink) - * [FileSystem(包括 Hadoop ) - 仅支持流]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) (sink) - * [FileSystem(包括 Hadoop ) - 流批统一]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink) + * [FileSystem (Hadoop included) - Streaming and Batch]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink) * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink) + * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) + * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source) * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink) + * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source) * [Twitter Streaming API]({{< ref "docs/connectors/datastream/twitter" >}}) (source) - * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink) 请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。 diff --git a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md index ba4554b..4561891 100644 --- a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md +++ b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md @@ -4,6 +4,7 @@ weight: 6 type: docs aliases: - /zh/dev/connectors/streamfile_sink.html +bookHidden: true --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -26,10 +27,12 @@ under the License. # Streaming File Sink - - 这个连接器提供了一个 Sink 来将分区文件写入到支持 [Flink `FileSystem`]({{< ref "docs/deployment/filesystems/overview" >}}) 接口的文件系统中。 +{{< hint warning >}} +This Streaming File Sink is in the process of being phased out. Please use the unified [File Sink]({{< ref "docs/connectors/datastream/file_sink" >}}) as a drop-in replacement. +{{< /hint >}} + Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。 桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。 diff --git a/docs/content/docs/connectors/datastream/cassandra.md b/docs/content/docs/connectors/datastream/cassandra.md index a3edff2..c281b46 100644 --- a/docs/content/docs/connectors/datastream/cassandra.md +++ b/docs/content/docs/connectors/datastream/cassandra.md @@ -1,6 +1,6 @@ --- title: Cassandra -weight: 3 +weight: 4 type: docs aliases: - /dev/connectors/cassandra.html diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md index 6b2cb94..a94d7bd 100644 --- a/docs/content/docs/connectors/datastream/kafka.md +++ b/docs/content/docs/connectors/datastream/kafka.md @@ -1,6 +1,6 @@ --- title: Kafka -weight: 2 +weight: 3 type: docs aliases: - /dev/connectors/kafka.html diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 474b521..c594ba8 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -1,6 +1,6 @@ --- title: Kinesis -weight: 4 +weight: 5 type: docs aliases: - /dev/connectors/kinesis.html diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md index a558d76..ec2b1df 100644 --- a/docs/content/docs/connectors/datastream/overview.md +++ b/docs/content/docs/connectors/datastream/overview.md @@ -44,9 +44,11 @@ Connectors provide code for interfacing with various third-party systems. Curren * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink) * [FileSystem (Hadoop included) - Streaming and Batch]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink) * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink) + * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) + * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source) * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink) + * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source) * [Twitter Streaming API]({{< ref "docs/connectors/datastream/twitter" >}}) (source) - * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink) Keep in mind that to use one of these connectors in an application, additional third party