This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-website.git

commit a5bea9dbe905e6b390fe80fb1e625e9f1f5852d2
Author: Joao Boto <[email protected]>
AuthorDate: Wed Nov 10 09:41:47 2021 +0100

    [BAHIR-286] Update documentation for Flink extensions
---
 .../flink/current/flink-streaming-influxdb2.md     | 234 +++++++++++++++++++++
 site/docs/flink/current/flink-streaming-pinot.md   | 149 +++++++++++++
 .../templates/flink-streaming-influxdb2.template   |  27 +++
 .../flink/templates/flink-streaming-pinot.template |  27 +++
 update-doc.sh                                      |   2 +
 5 files changed, 439 insertions(+)

diff --git a/site/docs/flink/current/flink-streaming-influxdb2.md 
b/site/docs/flink/current/flink-streaming-influxdb2.md
new file mode 100644
index 0000000..5c4b99e
--- /dev/null
+++ b/site/docs/flink/current/flink-streaming-influxdb2.md
@@ -0,0 +1,234 @@
+---
+layout: page
+title: Apache Flink Streaming Connector for InfluxDB2
+description: Apache Flink Streaming Connector for InfluxDB2
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+# Flink InfluxDB Connector
+
+This connector provides a Source that parses the [InfluxDB Line 
Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/)
 and a Sink that can write to [InfluxDB](https://www.influxdata.com/). The 
Source implements the unified [Data Source 
API](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/sources.html).
 Our sink implements the unified [Sink 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API#FLIP143:Unif
 [...]
+
+The InfluxDB Source serves as an output target for 
[Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) (and 
compatible tools). Telegraf pushes data to the source. The process is 
push-based, so it is a stateless (non-replayable) source.
+
+![Flink InfluxDB Connector Architecture](media/connector-architecture.png)
+
+## Installation
+
+To use this connector, add the following dependency to your project:
+
+```xml
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-influxdb2_2.12</artifactId>
+  <version>1.1-SNAPSHOT</version>
+</dependency>
+```
+
+Note that the streaming connectors are not part of the binary distribution of 
Flink. You need to shade them into your job jar for cluster execution. See how 
to link with them for cluster execution 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies).
+
+## Compatibility
+
+This module is compatible with InfluxDB 2.x and InfluxDB 1.8+. See more 
information 
[here](https://github.com/influxdata/influxdb-client-java#influxdb-client-java).
+
+## Source
+
+The Source accepts data in the form of the [Line 
Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/).
 One HTTP server per source instance is started. It parses HTTP requests to our 
Data Point class. That Data Point instance is deserialized by a user-provided 
implementation of our InfluxDBDataPointDeserializer and sent to the next Flink 
operator.
+
+When using Telegraf, use its [HTTP output 
plugin](https://docs.influxdata.com/telegraf/v1.17/plugins/#http):
+
+```toml
+[[outputs.http]]
+  url = "http://task-manager:8000/api/v2/write";
+  method = "POST"
+  data_format = "influx"
+```
+
+![Source Architecture](media/source-architecture.png)
+
+### Usage
+
+```java
+InfluxDBSource<Long> influxDBSource = InfluxBSource.builder()
+        .setDeserializer(new TestDeserializer())
+        .build()
+
+// ...
+
+/**
+ * Implementation of InfluxDBDataPointDeserializer interface
+ * (dataPoint) -----> (element)
+ *  test,longValue=1 fieldKey="fieldValue" -----------> 1L
+ *  test,longValue=2 fieldKey="fieldValue" -----------> 2L
+ *  test,longValue=3 fieldKey="fieldValue" -----------> 3L
+ */
+class TestDeserializer implements InfluxDBDataPointDeserializer<Long> {
+    @Override
+    public Long deserialize(final DataPoint dataPoint) {
+        return (Long) dataPoint.getField("longValue");
+    }
+}
+```
+
+
+### Options
+
+| Option            | Description     | Default Value     |
+| ----------------- |-----------------|:-----------------:|
+| ENQUEUE_WAIT_TIME | The time out in seconds for enqueuing an HTTP request to 
the queue. | 5 |
+| INGEST_QUEUE_CAPACITY | Size of queue that buffers HTTP requests data points 
before fetching. | 1000 |
+| MAXIMUM_LINES_PER_REQUEST | The maximum number of lines that should be 
parsed per HTTP request. | 10000 |
+| PORT | TCP port on which the source's HTTP server is running on. | 8000 |
+
+### Supported Data Types in Field Set
+
+| Field Set     | Support       |
+| ------------- |:-------------:|
+|    Float      | ✅            |
+|    Integer    | ✅            |
+|    UInteger   | ❌            |
+|    String     | ✅            |
+|    Boolean    | ✅            |
+
+See InfluxDB field set value [data 
type](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/#field-set).
+The parsing limitation is related to the Apache Druid project. For more 
information see this [issue](https://github.com/apache/druid/issues/10993)
+
+
+## Sink
+
+The Sink writes data points to InfluxDB using the [InfluxDB Java 
Client](https://github.com/influxdata/influxdb-client-java). You provide the 
connection information (URL, username, password, bucket, and organization) and 
an implementation of `InfluxDBSchemaSerializer<IN>` generic interface. The 
implementation of the interface overrides the `serialize(IN element, Context 
context)` function. This function serializes incoming Flink elements of type 
`IN` to [Point](https://github.com/influxd [...]
+
+It is possible to write multiple data points to InfluxDB simultaneously by 
separating each point with a new line. Batching data points in this manner 
results in much higher performance. The batch size can be set through the 
`WRITE_BUFFER_SIZE` option. By default, the buffer size is set to 1000 and can 
be changed to any value using the `setWriteBufferSize(final int bufferSize)` of 
the Sink builder class.
+
+It is possible to write checkpoint data points to InfluxDB whenever Flink sets 
a checkpoint. To enable this functionality, you need to set the 
`WRITE_DATA_POINT_CHECKPOINT` flag to true (default is false). The checkpoint 
data point looks as follow:
+```text
+checkpoint checkpoint=flink <timestamp>
+```
+The timestamp refers to the latest element that Flink serializes.
+
+### Usage
+
+```java
+// The InfluxDB Sink uses the build pattern to create a Sink object
+InfluxDBSink<Long> influxDBSink = InfluxDBSink.builder()
+        .setInfluxDBSchemaSerializer(new TestSerializer())
+        .setInfluxDBUrl(getUrl())           // http://localhost:8086
+        .setInfluxDBUsername(getUsername()) // admin
+        .setInfluxDBPassword(getPassword()) // admin
+        .setInfluxDBBucket(getBucket())     // default
+        .setInfluxDBOrganization(getOrg())  // influxdata
+        .build();
+
+// ...
+
+/**
+ * Implementation of InfluxDBSchemaSerializer interface
+ * (element) -----> (dataPoint)
+ *  1L -----------> test,longValue=1 fieldKey="fieldValue"
+ *  2L -----------> test,longValue=2 fieldKey="fieldValue"
+ *  3L -----------> test,longValue=3 fieldKey="fieldValue"
+ */
+class TestSerializer implements InfluxDBSchemaSerializer<Long> {
+
+    @Override
+    public Point serialize(Long element, Context context) {
+        final Point dataPoint = new Point("test");
+        dataPoint.addTag("longValue", String.valueOf(element));
+        dataPoint.addField("fieldKey", "fieldValue");
+        return dataPoint;
+    }
+}
+```
+
+### Options
+
+| Option            | Description   | Default Value   |
+| ----------------- |-----------------|:-----------------:|
+| WRITE_DATA_POINT_CHECKPOINT | Determines if the checkpoint data point should 
be written to InfluxDB or not. | false |
+| WRITE_BUFFER_SIZE | Number of elements to buffer the data before writing 
them to InfluxDB. | 1000 |
+| INFLUXDB_URL | InfluxDB Connection URL. | ❌ |
+| INFLUXDB_USERNAME | InfluxDB username. | ❌ |
+| INFLUXDB_PASSWORD | InfluxDB password. | ❌ |
+| INFLUXDB_BUCKET | InfluxDB bucket. | ❌ |
+| INFLUXDB_ORGANIZATION | InfluxDB organization. | ❌ |
+
+## Building the connector
+
+The connector can be built by using maven:
+
+```bash
+mvn clean install -DskipTests -pl flink-connector-influxdb2
+```
+
+## Benchmarks
+
+Some basic benchmarks were conducted.
+
+### Source
+A data generator that sends line protocol in form of HTTP requests to an REST 
endpoint was used for the source benchmarks.
+Throughput and latency was measured for a direct connection between the data 
generator and the InfluxDB source.
+A setup including Telegraf was used to benchmark the latency in a more 
realistic setup.
+
+### Sink
+The from sequence source was used to generate data for the sink benchmark.
+Throughput was measured without any other Flink operators, whereas the latency 
was measured by adding a timestamp to the event using a map operator before the 
sink.
+This timestamp was then compared to the insertion timestamp set by InfluxDB 
itself.
+
+### Visualization
+
+The results of these benchmarks are visualized 
[here](https://docs.google.com/presentation/d/1apd_wys0OzaiifAisABFg4B7HCydbkZXpN0OFd6cjEg/edit?usp=sharing).
+
+
+## Usage and Deployment Example
+
+See 
[`Shark/flink-connector-influxdb-example`](https://github.com/Shark/flink-connector-influxdb-example)
 for an example showing you how to use and deploy the InfluxDB source and sink 
connectors in a Flink application on a Kubernetes cluster.
+
+## Future Work
+
+* [Source] Dynamic (unprivileged) ports for HTTP server
+* [Source] Integration with Kubernetes service discovery in conjunction with 
dynamic ports
+* [Source] Use multi-threaded HTTP server
+* [Sink] Flush write buffer after an inactivity timeout
+
+## Contributors
+
+<!-- ALL-CONTRIBUTORS-LIST:START - Do not remove or modify this section -->
+<table>
+  <tr class="noBorder">
+    <td class="noBorder" align="center">
+        <a href="https://github.com/1p4pk";><img class="roundImg"
+         
src="https://avatars.githubusercontent.com/u/32157576?v=4?s=100"width="100px;"/><br
 /><sub><b>Leon Papke</b></sub>
+         </a>
+     </td>
+    <td class="noBorder" align="center">
+        <a href="https://github.com/raminqaf";><img class="roundImg" 
src="https://avatars.githubusercontent.com/u/20357405?v=4?s=100"; 
width="100px;"/><br /><sub><b>Ramin Gharib</b></sub>
+        </a>
+    </td>
+    <td  class="noBorder" align="center">
+    <a href="https://github.com/Shark";><img class="roundImg"  
src="https://avatars.githubusercontent.com/u/53632?v=4?s=100"; width="100px;" 
alt=""/>        <br /><sub><b>Felix Seidel</b></sub></a>
+    </td>
+  </tr>
+</table>
+<!-- ALL-CONTRIBUTORS-LIST:END -->
+
+This project follows the 
[all-contributors](https://github.com/all-contributors/all-contributors) 
specification. Contributions of any kind welcome!
diff --git a/site/docs/flink/current/flink-streaming-pinot.md 
b/site/docs/flink/current/flink-streaming-pinot.md
new file mode 100644
index 0000000..b5a705a
--- /dev/null
+++ b/site/docs/flink/current/flink-streaming-pinot.md
@@ -0,0 +1,149 @@
+---
+layout: page
+title: Apache Flink Streaming Connector for Pinot
+description: Apache Flink Streaming Connector for Pinot
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+# Flink Pinot Connector
+
+This connector provides a sink to [Apache Pinot](http://pinot.apache.org/)™.  
+To use this connector, add the following dependency to your project:
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-pinot_2.11</artifactId>
+      <version>1.1-SNAPSHOT</version>
+    </dependency>
+
+*Version Compatibility*: This module is compatible with Pinot 0.6.0.
+
+Note that the streaming connectors are not part of the binary distribution of 
Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).
+
+The sink class is called `PinotSink`.
+
+## Architecture
+The Pinot sink stores elements from upstream Flink tasks in an Apache Pinot 
table.
+We support two execution modes
+* `RuntimeExecutionMode.BATCH`
+* `RuntimeExecutionMode.STREAMING` which requires checkpointing to be enabled.
+
+### PinotSinkWriter
+Whenever the sink receives elements from upstream tasks, they are received by 
an instance of the PinotSinkWriter.
+The `PinotSinkWriter` holds a list of `PinotWriterSegment`s where each 
`PinotWriterSegment` is capable of storing `maxRowsPerSegment` elements.
+Whenever the maximum number of elements to hold is not yet reached the 
`PinotWriterSegment` is considered to be active.
+Once the maximum number of elements to hold was reached, an active 
`PinotWriterSegment` gets inactivated and a new empty `PinotWriterSegment` is 
created.
+
+<img height="225" alt="PinotSinkWriter" src="docs/images/PinotSinkWriter.png">
+
+Thus, there is always one active `PinotWriterSegment` that new incoming 
elements will go to.
+Over time, the list of `PinotWriterSegment` per `PinotSinkWriter` increases up 
to the point where a checkpoint is created.
+
+**Checkpointing**  
+On checkpoint creation `PinotSinkWriter.prepareCommit` gets called by the 
Flink environment.
+This triggers the creation of `PinotSinkCommittable`s where each inactive 
`PinotWriterSegment` creates exactly one `PinotSinkCommittable`.
+
+<img height="250" alt="PinotSinkWriter prepareCommit" 
src="docs/images/PinotSinkWriter_prepareCommit.png">
+
+In order to create a `PinotSinkCommittable`, a file containing a 
`PinotWriterSegment`'s elements is on the shared filesystem defined via 
`FileSystemAdapter`.
+The file contains a list of elements in JSON format. The serialization is done 
via `JSONSerializer`.
+A `PinotSinkCommittables` then holds the path to the data file on the shared 
filesystem as well as the minimum and maximum timestamp of all contained 
elements (extracted via `EventTimeExtractor`).
+
+
+### PinotSinkGlobalCommitter
+In order to be able to follow the guidelines for Pinot segment naming, we need 
to include the minimum and maximum timestamp of an element in the metadata of a 
segment and in its name.
+The minimum and maximum timestamp of all elements between two checkpoints is 
determined at a parallelism of 1 in the `PinotSinkGlobalCommitter`.
+This procedure allows recovering from failure by deleting previously uploaded 
segments which prevents having duplicate segments in the Pinot table.
+
+<img height="250" alt="PinotSinkGlobalCommitter combine" 
src="docs/images/PinotSinkGlobalCommitter_combine.png">
+
+After all `PinotSinkWriter` subtasks emitted their `PinotSinkCommittable`s, 
they are sent to the `PinotSinkGlobalCommitter` which first combines all 
collected `PinotSinkCommittable`s into a single `PinotSinkGlobalCommittable`.
+Therefore, the minimum and maximum timestamps of all collected 
`PinotSinkCommittable`s is determined. 
+Moreover, the `PinotSinkGlobalCommittable` holds references to all data files 
from the `PinotSinkCommittable`s.
+
+When finally committing a `PinotSinkGlobalCommittable` the following procedure 
is executed:
+* Read all data files from the shared filesystem (using `FileSystemAdapter`).
+* Generate Pinot segment names using `PinotSinkSegmentNameGenerator`.
+* Create Pinot segments with minimum and maximum timestamps (stored in 
`PinotSinkGlobalCommittable`) and previously generated segment assigned.
+* Upload Pinot segments to the Pinot controller
+
+
+## Delivery Guarantees
+Resulting from the above described architecture the `PinotSink` provides an 
at-least-once delivery guarantee.
+While the failure recovery mechanism ensures that duplicate segments are 
prevented, there might be temporary inconsistencies in the Pinot table which 
can result in downstream tasks receiving an element multiple times.
+
+## Options
+| Option                 | Description                                         
                             |
+| ---------------------- | 
--------------------------------------------------------------------------------
 | 
+| `pinotControllerHost`  | Host of the Pinot controller                        
                             |
+| `pinotControllerPort`  | Port of the Pinot controller                        
                             |
+| `tableName`            | Target Pinot table's name                           
                             |
+| `maxRowsPerSegment`    | Maximum number of rows to be stored within a Pinot 
segment                       |
+| `tempDirPrefix`         | Prefix for temp directories used                   
                               |
+| `jsonSerializer`       | Serializer used to convert elements to JSON         
                             |
+| `eventTimeExtractor`   | Defines the way event times are extracted from 
received objects                   |
+| `segmentNameGenerator` | Pinot segment name generator                        
                             |
+| `fsAdapter`            | Filesystem adapter used to save files for sharing 
files across nodes               |
+| `numCommitThreads`     | Number of threads used in the 
`PinotSinkGlobalCommitter` for committing segments |
+
+## Usage
+```java
+StreamExecutionEnvironment env = ...
+// Checkpointing needs to be enabled when executing in STREAMING mode
+        env.enableCheckpointing(long interval);
+
+        DataStream<PinotRow> dataStream = ...
+        PinotSink pinotSink = new PinotSink.Builder<PinotRow>(String 
pinotControllerHost, String pinotControllerPort, String tableName)
+
+        // Serializes a PinotRow to JSON format
+        .withJsonSerializer(JsonSerializer<PinotRow> jsonSerializer)
+
+        // Extracts the timestamp from a PinotRow
+        .withEventTimeExtractor(EventTimeExtractor<IN> eventTimeExtractor)
+
+        // Defines the segment name generation via the predefined 
SimpleSegmentNameGenerator
+        // Exemplary segment name: 
tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0
+        .withSimpleSegmentNameGenerator(String tableName, String 
segmentNamePostfix)
+
+        // Use a custom segment name generator if the 
SimpleSegmentNameGenerator does not work for your use case
+        .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator)
+
+        // Use a custom filesystem adapter. 
+        // CAUTION: Make sure all nodes your Flink app runs on can access the 
shared filesystem via the provided FileSystemAdapter
+        .withFileSystemAdapter(FileSystemAdapter fsAdapter)
+
+        // Defines the size of the Pinot segments
+        .withMaxRowsPerSegment(int maxRowsPerSegment)
+
+        // Prefix within the local filesystem's temp directory used for 
storing intermediate files
+        .withTempDirectoryPrefix(String tempDirPrefix)
+        
+        // Number of threads used in the `PinotSinkGlobalCommitter` to commit 
a batch of segments
+        // Optional - Default is 4
+        .withNumCommitThreads(int numCommitThreads)
+
+        // Builds the PinotSink
+        .build()
+        dataStream.addSink(pinotSink);
+```
diff --git a/site/docs/flink/templates/flink-streaming-influxdb2.template 
b/site/docs/flink/templates/flink-streaming-influxdb2.template
new file mode 100644
index 0000000..4dc171a
--- /dev/null
+++ b/site/docs/flink/templates/flink-streaming-influxdb2.template
@@ -0,0 +1,27 @@
+---
+layout: page
+title: Apache Flink Streaming Connector for InfluxDB2
+description: Apache Flink Streaming Connector for InfluxDB2
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
diff --git a/site/docs/flink/templates/flink-streaming-pinot.template 
b/site/docs/flink/templates/flink-streaming-pinot.template
new file mode 100644
index 0000000..cbd76fb
--- /dev/null
+++ b/site/docs/flink/templates/flink-streaming-pinot.template
@@ -0,0 +1,27 @@
+---
+layout: page
+title: Apache Flink Streaming Connector for Pinot
+description: Apache Flink Streaming Connector for Pinot
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
diff --git a/update-doc.sh b/update-doc.sh
index 30a1a1b..8c73c6e 100755
--- a/update-doc.sh
+++ b/update-doc.sh
@@ -178,8 +178,10 @@ function update_flink {
         flink-streaming-akka      flink-connector-akka      \
         flink-streaming-flume     flink-connector-flume     \
         flink-streaming-influxdb  flink-connector-influxdb  \
+        flink-streaming-influxdb2  flink-connector-influxdb2  \
         flink-streaming-kudu      flink-connector-kudu      \
         flink-streaming-netty     flink-connector-netty     \
+        flink-streaming-pinot     flink-connector-pinot     \
         flink-streaming-redis     flink-connector-redis
 
     check_version_strings "$FLINK_WEBSITE_DOC_DIR"

Reply via email to