This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-website.git
The following commit(s) were added to refs/heads/master by this push:
new f84894d [BAHIR-314] Add Bahir Flink release 1.1.0
f84894d is described below
commit f84894d6480d8464cee63c1994a382b9efc245e4
Author: Joao Boto <[email protected]>
AuthorDate: Fri Aug 5 21:43:38 2022 +0200
[BAHIR-314] Add Bahir Flink release 1.1.0
---
site/docs/flink/1.0/documentation.md | 4 +-
.../docs/flink/{current => 1.1.0}/documentation.md | 6 +-
.../{current => 1.1.0}/flink-streaming-activemq.md | 2 +-
.../{current => 1.1.0}/flink-streaming-akka.md | 2 +-
.../{current => 1.1.0}/flink-streaming-flume.md | 2 +-
.../{current => 1.1.0}/flink-streaming-influxdb.md | 2 +-
.../flink-streaming-influxdb2.md | 2 +-
.../{current => 1.1.0}/flink-streaming-kudu.md | 104 ++++++++++++---------
.../{current => 1.1.0}/flink-streaming-netty.md | 2 +-
.../{current => 1.1.0}/flink-streaming-pinot.md | 2 +-
.../{current => 1.1.0}/flink-streaming-redis.md | 2 +-
site/docs/flink/current/documentation.md | 8 +-
.../docs/flink/current/flink-streaming-activemq.md | 2 +-
site/docs/flink/current/flink-streaming-akka.md | 2 +-
site/docs/flink/current/flink-streaming-flume.md | 2 +-
.../docs/flink/current/flink-streaming-influxdb.md | 2 +-
.../flink/current/flink-streaming-influxdb2.md | 2 +-
site/docs/flink/current/flink-streaming-kudu.md | 104 ++++++++++++---------
site/docs/flink/current/flink-streaming-netty.md | 2 +-
site/docs/flink/current/flink-streaming-pinot.md | 2 +-
site/docs/flink/current/flink-streaming-redis.md | 2 +-
site/docs/flink/overview.md | 3 +-
site/index.md | 5 +-
23 files changed, 147 insertions(+), 119 deletions(-)
diff --git a/site/docs/flink/1.0/documentation.md
b/site/docs/flink/1.0/documentation.md
index 55b557f..4c8b3ef 100644
--- a/site/docs/flink/1.0/documentation.md
+++ b/site/docs/flink/1.0/documentation.md
@@ -1,7 +1,7 @@
---
layout: page
-title: Extensions for Apache Flink (1.0.0-SNAPSHOT)
-description: Extensions for Apache Flink (1.0.0-SNAPSHOT)
+title: Extensions for Apache Flink (1.0)
+description: Extensions for Apache Flink (1.0)
group: nav-right
---
<!--
diff --git a/site/docs/flink/current/documentation.md
b/site/docs/flink/1.1.0/documentation.md
similarity index 87%
copy from site/docs/flink/current/documentation.md
copy to site/docs/flink/1.1.0/documentation.md
index 969c97f..e652510 100644
--- a/site/docs/flink/current/documentation.md
+++ b/site/docs/flink/1.1.0/documentation.md
@@ -1,7 +1,7 @@
---
layout: page
-title: Extensions for Apache Flink (1.1-SNAPSHOT)
-description: Extensions for Apache Flink (1.1-SNAPSHOT)
+title: Extensions for Apache Flink (1.1.0)
+description: Extensions for Apache Flink (1.1.0)
group: nav-right
---
<!--
@@ -41,7 +41,7 @@ limitations under the License.
[InfluxDB2 connector](../flink-streaming-influxdb2)
{:height="36px" width="36px"}
-[Kudu connector](../flink-streaming-kudu)
+[Kudu connector](../flink-streaming-kudu)
{:height="36px" width="36px"}
[Netty connector](../flink-streaming-netty)
diff --git a/site/docs/flink/current/flink-streaming-activemq.md
b/site/docs/flink/1.1.0/flink-streaming-activemq.md
similarity index 97%
copy from site/docs/flink/current/flink-streaming-activemq.md
copy to site/docs/flink/1.1.0/flink-streaming-activemq.md
index d94bf50..19cd126 100644
--- a/site/docs/flink/current/flink-streaming-activemq.md
+++ b/site/docs/flink/1.1.0/flink-streaming-activemq.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-activemq_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with ActiveMQ 5.14.0.
diff --git a/site/docs/flink/current/flink-streaming-akka.md
b/site/docs/flink/1.1.0/flink-streaming-akka.md
similarity index 98%
copy from site/docs/flink/current/flink-streaming-akka.md
copy to site/docs/flink/1.1.0/flink-streaming-akka.md
index 0af3e13..b85f7f6 100644
--- a/site/docs/flink/current/flink-streaming-akka.md
+++ b/site/docs/flink/1.1.0/flink-streaming-akka.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-akka_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with Akka 2.0+.
diff --git a/site/docs/flink/current/flink-streaming-flume.md
b/site/docs/flink/1.1.0/flink-streaming-flume.md
similarity index 97%
copy from site/docs/flink/current/flink-streaming-flume.md
copy to site/docs/flink/1.1.0/flink-streaming-flume.md
index c5e9a89..fff1917 100644
--- a/site/docs/flink/current/flink-streaming-flume.md
+++ b/site/docs/flink/1.1.0/flink-streaming-flume.md
@@ -33,7 +33,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-flume_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with Flume 1.8.0.
diff --git a/site/docs/flink/current/flink-streaming-influxdb.md
b/site/docs/flink/1.1.0/flink-streaming-influxdb.md
similarity index 98%
copy from site/docs/flink/current/flink-streaming-influxdb.md
copy to site/docs/flink/1.1.0/flink-streaming-influxdb.md
index fe0d946..7b03d94 100644
--- a/site/docs/flink/current/flink-streaming-influxdb.md
+++ b/site/docs/flink/1.1.0/flink-streaming-influxdb.md
@@ -33,7 +33,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-influxdb_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with InfluxDB 1.3.x
diff --git a/site/docs/flink/current/flink-streaming-influxdb2.md
b/site/docs/flink/1.1.0/flink-streaming-influxdb2.md
similarity index 99%
copy from site/docs/flink/current/flink-streaming-influxdb2.md
copy to site/docs/flink/1.1.0/flink-streaming-influxdb2.md
index 5c4b99e..86a8237 100644
--- a/site/docs/flink/current/flink-streaming-influxdb2.md
+++ b/site/docs/flink/1.1.0/flink-streaming-influxdb2.md
@@ -41,7 +41,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-influxdb2_2.12</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
```
diff --git a/site/docs/flink/current/flink-streaming-kudu.md
b/site/docs/flink/1.1.0/flink-streaming-kudu.md
similarity index 78%
copy from site/docs/flink/current/flink-streaming-kudu.md
copy to site/docs/flink/1.1.0/flink-streaming-kudu.md
index 28dec2c..356adb1 100644
--- a/site/docs/flink/current/flink-streaming-kudu.md
+++ b/site/docs/flink/1.1.0/flink-streaming-kudu.md
@@ -29,18 +29,17 @@ limitations under the License.
This connector provides a source (```KuduInputFormat```), a sink/output
(```KuduSink``` and ```KuduOutputFormat```, respectively),
- as well a table source (`KuduTableSource`), an upsert table sink
(`KuduTableSink`), and a catalog (`KuduCatalog`),
- to allow reading and writing to [Kudu](https://kudu.apache.org/).
+as well a table source (`KuduTableSource`), an upsert table sink
(`KuduTableSink`), and a catalog (`KuduCatalog`),
+to allow reading and writing to [Kudu](https://kudu.apache.org/).
To use this connector, add the following dependency to your project:
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-kudu_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
- </dependency>
-
- *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1*
(last stable version) and Apache Flink 1.10.+.
+<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>flink-connector-kudu_2.11</artifactId>
+ <version>1.1.0</version>
+</dependency>
+*Version Compatibility*: This module is compatible with Apache Kudu *1.11.1*
(last stable version) and Apache Flink 1.10.+.
Note that the streaming connectors are not part of the binary distribution of
Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
@@ -72,7 +71,6 @@ catalogs:
type: kudu
kudu.masters: <host>:7051
```
-
Once the SQL CLI is started you can simply switch to the Kudu catalog by
calling `USE CATALOG kudu;`
You can also create and use the KuduCatalog directly in the Table environment:
@@ -83,12 +81,12 @@ KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
```
-
### DDL operations using SQL
It is possible to manipulate Kudu tables using SQL DDL.
When not using the Kudu catalog, the following additional properties must be
specified in the `WITH` clause:
+
* `'connector.type'='kudu'`
* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of
Kudu masters
* `'kudu.table'='...'`: The table's name within the Kudu database.
@@ -116,8 +114,8 @@ CREATE TABLE TestTable (
'kudu.primary-key-columns' = 'first,second'
)
```
-
Other catalogs
+
```
CREATE TABLE TestTable (
first STRING,
@@ -131,17 +129,16 @@ CREATE TABLE TestTable (
'kudu.primary-key-columns' = 'first,second'
)
```
-
Renaming a table:
+
```
ALTER TABLE TestTable RENAME TO TestTableRen
```
-
Dropping a table:
+
```sql
DROP TABLE TestTableRen
```
-
#### Creating a KuduTable directly with KuduCatalog
The KuduCatalog also exposes a simple `createTable` method that required only
the where table configuration,
@@ -150,7 +147,7 @@ including schema, partitioning, replication, etc. can be
specified using a `Kudu
Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory`
and
a `CreateTableOptionsFactory` parameter, that implement respectively
`getColumnSchemas()`
returning a list of Kudu
[ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html)
objects;
- and `getCreateTableOptions()` returning a
+and `getCreateTableOptions()` returning a
[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html)
object.
This example shows the creation of a table called `ExampleTable` with two
columns,
@@ -182,32 +179,46 @@ Read more about Kudu schema design in the [Kudu
docs](https://kudu.apache.org/do
### Supported data types
-| Flink/SQL | Kudu |
-|----------------------|:-----------------------:|
-| `STRING` | STRING |
-| `BOOLEAN` | BOOL |
-| `TINYINT` | INT8 |
-| `SMALLINT` | INT16 |
-| `INT` | INT32 |
-| `BIGINT` | INT64 |
-| `FLOAT` | FLOAT |
-| `DOUBLE` | DOUBLE |
-| `BYTES` | BINARY |
-| `TIMESTAMP(3)` | UNIXTIME_MICROS |
+
+| Flink/SQL | Kudu |
+| ---------------- | :---------------: |
+| `STRING` | STRING |
+| `BOOLEAN` | BOOL |
+| `TINYINT` | INT8 |
+| `SMALLINT` | INT16 |
+| `INT` | INT32 |
+| `BIGINT` | INT64 |
+| `FLOAT` | FLOAT |
+| `DOUBLE` | DOUBLE |
+| `BYTES` | BINARY |
+| `TIMESTAMP(3)` | UNIXTIME_MICROS |
Note:
-* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java
conversion class is `java.sql.Timestamp`
+
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java
conversion class is `java.sql.Timestamp`
* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a
`VARBINARY(2147483647)`
-* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a
`VARCHAR(2147483647)`
+* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a
`VARCHAR(2147483647)`
* `DECIMAL` types are not yet supported
+### Lookup Cache
+
+Kudu connector can be used in temporal join as a lookup source (aka. dimension
table). Currently, only sync lookup mode is supported.
+
+By default, lookup cache is not enabled. You can enable it by setting both
`lookup.cache.max-rows` and `lookup.cache.ttl`.
+
+The lookup cache is used to improve performance of temporal join theKudu
connector. By default, lookup cache is not enabled, so all the requests are
sent to external database. When lookup cache is enabled, each process (i.e.
TaskManager) will hold a cache. Flink will lookup the cache first, and only
send requests to external database when cache missing, and update cache with
the rows returned. The oldest rows in cache will be expired when the cache hit
to the max cached rows `kudu.lookup [...]
+
+Reference :[Flink Jdbc
Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/#lookup-cache)
+
+
### Known limitations
+
* Data type limitations (see above).
* SQL Create table: primary keys can only be set by the
`kudu.primary-key-columns` property, using the
-`PRIMARY KEY` constraint is not yet possible.
+ `PRIMARY KEY` constraint is not yet possible.
* SQL Create table: range partitioning is not supported.
* When getting a table through the Catalog, NOT NULL and PRIMARY KEY
constraints are ignored. All columns
-are described as being nullable, and not being primary keys.
+ are described as being nullable, and not being primary keys.
* Kudu tables cannot be altered through the catalog other than simple renaming
## DataStream API
@@ -219,13 +230,15 @@ with Kudu data.
### Reading tables into a DataStreams
There are 2 main ways of reading a Kudu Table into a DataStream
- 1. Using the `KuduCatalog` and the Table API
- 2. Using the `KuduRowInputFormat` directly
+
+1. Using the `KuduCatalog` and the Table API
+2. Using the `KuduRowInputFormat` directly
Using the `KuduCatalog` and Table API is the recommended way of reading tables
as it automatically
guarantees type safety and takes care of configuration of our readers.
This is how it works in practice:
+
```java
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
tableSettings);
@@ -235,7 +248,6 @@ tableEnv.useCatalog("kudu");
Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
```
-
The second way of achieving the same thing is by using the
`KuduRowInputFormat` directly.
In this case we have to manually provide all information about our table:
@@ -246,18 +258,19 @@ KuduRowInputFormat inputFormat = new
KuduRowInputFormat(readerConfig, tableInfo)
DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
```
-
At the end of the day the `KuduTableSource` is just a convenient wrapper
around the `KuduRowInputFormat`.
### Kudu Sink
+
The connector provides a `KuduSink` class that can be used to consume
DataStreams
and write the results into a Kudu table.
The constructor takes 3 or 4 arguments.
- * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
- * `KuduTableInfo` identifies the table to be written
- * `KuduOperationMapper` maps the records coming from the DataStream to a list
of Kudu operations.
- * `KuduFailureHandler` (optional): If you want to provide your own logic for
handling writing failures.
+
+* `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+* `KuduTableInfo` identifies the table to be written
+* `KuduOperationMapper` maps the records coming from the DataStream to a list
of Kudu operations.
+* `KuduFailureHandler` (optional): If you want to provide your own logic for
handling writing failures.
The example below shows the creation of a sink for Row type records of 3
fields. It Upserts each record.
It is assumed that a Kudu table with columns `col1, col2, col3` called
`AlreadyExistingTable` exists. Note that if this were not the case,
@@ -275,7 +288,6 @@ KuduSink<Row> sink = new KuduSink<>(
AbstractSingleOperationMapper.KuduOperation.UPSERT)
)
```
-
#### KuduOperationMapper
This section describes the Operation mapping logic in more detail.
@@ -299,12 +311,13 @@ It is also possible to implement your own logic by
overriding the
`createBaseOperation` method that returns a Kudu
[Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple
types for constant operation, 1-to-1 sinks.
+
* `PojoOperationMapper`: Each table column must correspond to a POJO field
-with the same name. The `columnNames` array should contain those fields of
the POJO that
-are present as table columns (the POJO fields can be a superset of table
columns).
+ with the same name. The `columnNames` array should contain those fields of
the POJO that
+ are present as table columns (the POJO fields can be a superset of table
columns).
* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on
position. The
-`i`th field of the Row/Tuple corresponds to the column of the table at the
`i`th
-position in the `columnNames` array.
+ `i`th field of the Row/Tuple corresponds to the column of the table at the
`i`th
+ position in the `columnNames` array.
## Building the connector
@@ -314,7 +327,6 @@ The connector can be easily built by using maven:
cd bahir-flink
mvn clean install
```
-
### Running the tests
The integration tests rely on the Kudu test harness which requires the current
user to be able to ssh to localhost.
diff --git a/site/docs/flink/current/flink-streaming-netty.md
b/site/docs/flink/1.1.0/flink-streaming-netty.md
similarity index 99%
copy from site/docs/flink/current/flink-streaming-netty.md
copy to site/docs/flink/1.1.0/flink-streaming-netty.md
index 320537c..61bdf47 100644
--- a/site/docs/flink/current/flink-streaming-netty.md
+++ b/site/docs/flink/1.1.0/flink-streaming-netty.md
@@ -61,7 +61,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-netty_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
```
diff --git a/site/docs/flink/current/flink-streaming-pinot.md
b/site/docs/flink/1.1.0/flink-streaming-pinot.md
similarity index 99%
copy from site/docs/flink/current/flink-streaming-pinot.md
copy to site/docs/flink/1.1.0/flink-streaming-pinot.md
index b4c9a7b..533c471 100644
--- a/site/docs/flink/current/flink-streaming-pinot.md
+++ b/site/docs/flink/1.1.0/flink-streaming-pinot.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-pinot_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with Pinot 0.6.0.
diff --git a/site/docs/flink/current/flink-streaming-redis.md
b/site/docs/flink/1.1.0/flink-streaming-redis.md
similarity index 99%
copy from site/docs/flink/current/flink-streaming-redis.md
copy to site/docs/flink/1.1.0/flink-streaming-redis.md
index 0c646fb..edb4464 100644
--- a/site/docs/flink/current/flink-streaming-redis.md
+++ b/site/docs/flink/1.1.0/flink-streaming-redis.md
@@ -34,7 +34,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1.0</version>
</dependency>
*Version Compatibility*: This module is compatible with Redis 2.8.5.
diff --git a/site/docs/flink/current/documentation.md
b/site/docs/flink/current/documentation.md
index 969c97f..b18eb70 100644
--- a/site/docs/flink/current/documentation.md
+++ b/site/docs/flink/current/documentation.md
@@ -1,7 +1,7 @@
---
layout: page
-title: Extensions for Apache Flink (1.1-SNAPSHOT)
-description: Extensions for Apache Flink (1.1-SNAPSHOT)
+title: Extensions for Apache Flink (1.2-SNAPSHOT)
+description: Extensions for Apache Flink (1.2-SNAPSHOT)
group: nav-right
---
<!--
@@ -39,12 +39,12 @@ limitations under the License.
[InfluxDB connector](../flink-streaming-influxdb)
-[InfluxDB2 connector](../flink-streaming-influxdb2)
{:height="36px" width="36px"}
+[InfluxDB2 connector](../flink-streaming-influxdb2)
[Kudu connector](../flink-streaming-kudu)
[Netty connector](../flink-streaming-netty)
-[Pinot connector](../flink-streaming-pinot)
{:height="36px" width="36px"}
+[Pinot connector](../flink-streaming-pinot)
[Redis connector](../flink-streaming-redis)
diff --git a/site/docs/flink/current/flink-streaming-activemq.md
b/site/docs/flink/current/flink-streaming-activemq.md
index d94bf50..1e16002 100644
--- a/site/docs/flink/current/flink-streaming-activemq.md
+++ b/site/docs/flink/current/flink-streaming-activemq.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-activemq_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with ActiveMQ 5.14.0.
diff --git a/site/docs/flink/current/flink-streaming-akka.md
b/site/docs/flink/current/flink-streaming-akka.md
index 0af3e13..86a415e 100644
--- a/site/docs/flink/current/flink-streaming-akka.md
+++ b/site/docs/flink/current/flink-streaming-akka.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-akka_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Akka 2.0+.
diff --git a/site/docs/flink/current/flink-streaming-flume.md
b/site/docs/flink/current/flink-streaming-flume.md
index c5e9a89..19cf545 100644
--- a/site/docs/flink/current/flink-streaming-flume.md
+++ b/site/docs/flink/current/flink-streaming-flume.md
@@ -33,7 +33,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-flume_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Flume 1.8.0.
diff --git a/site/docs/flink/current/flink-streaming-influxdb.md
b/site/docs/flink/current/flink-streaming-influxdb.md
index fe0d946..acb36eb 100644
--- a/site/docs/flink/current/flink-streaming-influxdb.md
+++ b/site/docs/flink/current/flink-streaming-influxdb.md
@@ -33,7 +33,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-influxdb_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with InfluxDB 1.3.x
diff --git a/site/docs/flink/current/flink-streaming-influxdb2.md
b/site/docs/flink/current/flink-streaming-influxdb2.md
index 5c4b99e..217bbd3 100644
--- a/site/docs/flink/current/flink-streaming-influxdb2.md
+++ b/site/docs/flink/current/flink-streaming-influxdb2.md
@@ -41,7 +41,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-influxdb2_2.12</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
```
diff --git a/site/docs/flink/current/flink-streaming-kudu.md
b/site/docs/flink/current/flink-streaming-kudu.md
index 28dec2c..e1126f0 100644
--- a/site/docs/flink/current/flink-streaming-kudu.md
+++ b/site/docs/flink/current/flink-streaming-kudu.md
@@ -29,18 +29,17 @@ limitations under the License.
This connector provides a source (```KuduInputFormat```), a sink/output
(```KuduSink``` and ```KuduOutputFormat```, respectively),
- as well a table source (`KuduTableSource`), an upsert table sink
(`KuduTableSink`), and a catalog (`KuduCatalog`),
- to allow reading and writing to [Kudu](https://kudu.apache.org/).
+as well a table source (`KuduTableSource`), an upsert table sink
(`KuduTableSink`), and a catalog (`KuduCatalog`),
+to allow reading and writing to [Kudu](https://kudu.apache.org/).
To use this connector, add the following dependency to your project:
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-kudu_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
- </dependency>
-
- *Version Compatibility*: This module is compatible with Apache Kudu *1.11.1*
(last stable version) and Apache Flink 1.10.+.
+<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>flink-connector-kudu_2.11</artifactId>
+ <version>1.2-SNAPSHOT</version>
+</dependency>
+*Version Compatibility*: This module is compatible with Apache Kudu *1.11.1*
(last stable version) and Apache Flink 1.10.+.
Note that the streaming connectors are not part of the binary distribution of
Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).
@@ -72,7 +71,6 @@ catalogs:
type: kudu
kudu.masters: <host>:7051
```
-
Once the SQL CLI is started you can simply switch to the Kudu catalog by
calling `USE CATALOG kudu;`
You can also create and use the KuduCatalog directly in the Table environment:
@@ -83,12 +81,12 @@ KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
```
-
### DDL operations using SQL
It is possible to manipulate Kudu tables using SQL DDL.
When not using the Kudu catalog, the following additional properties must be
specified in the `WITH` clause:
+
* `'connector.type'='kudu'`
* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of
Kudu masters
* `'kudu.table'='...'`: The table's name within the Kudu database.
@@ -116,8 +114,8 @@ CREATE TABLE TestTable (
'kudu.primary-key-columns' = 'first,second'
)
```
-
Other catalogs
+
```
CREATE TABLE TestTable (
first STRING,
@@ -131,17 +129,16 @@ CREATE TABLE TestTable (
'kudu.primary-key-columns' = 'first,second'
)
```
-
Renaming a table:
+
```
ALTER TABLE TestTable RENAME TO TestTableRen
```
-
Dropping a table:
+
```sql
DROP TABLE TestTableRen
```
-
#### Creating a KuduTable directly with KuduCatalog
The KuduCatalog also exposes a simple `createTable` method that required only
the where table configuration,
@@ -150,7 +147,7 @@ including schema, partitioning, replication, etc. can be
specified using a `Kudu
Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory`
and
a `CreateTableOptionsFactory` parameter, that implement respectively
`getColumnSchemas()`
returning a list of Kudu
[ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html)
objects;
- and `getCreateTableOptions()` returning a
+and `getCreateTableOptions()` returning a
[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html)
object.
This example shows the creation of a table called `ExampleTable` with two
columns,
@@ -182,32 +179,46 @@ Read more about Kudu schema design in the [Kudu
docs](https://kudu.apache.org/do
### Supported data types
-| Flink/SQL | Kudu |
-|----------------------|:-----------------------:|
-| `STRING` | STRING |
-| `BOOLEAN` | BOOL |
-| `TINYINT` | INT8 |
-| `SMALLINT` | INT16 |
-| `INT` | INT32 |
-| `BIGINT` | INT64 |
-| `FLOAT` | FLOAT |
-| `DOUBLE` | DOUBLE |
-| `BYTES` | BINARY |
-| `TIMESTAMP(3)` | UNIXTIME_MICROS |
+
+| Flink/SQL | Kudu |
+| ---------------- | :---------------: |
+| `STRING` | STRING |
+| `BOOLEAN` | BOOL |
+| `TINYINT` | INT8 |
+| `SMALLINT` | INT16 |
+| `INT` | INT32 |
+| `BIGINT` | INT64 |
+| `FLOAT` | FLOAT |
+| `DOUBLE` | DOUBLE |
+| `BYTES` | BINARY |
+| `TIMESTAMP(3)` | UNIXTIME_MICROS |
Note:
-* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java
conversion class is `java.sql.Timestamp`
+
+* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java
conversion class is `java.sql.Timestamp`
* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a
`VARBINARY(2147483647)`
-* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a
`VARCHAR(2147483647)`
+* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a
`VARCHAR(2147483647)`
* `DECIMAL` types are not yet supported
+### Lookup Cache
+
+Kudu connector can be used in temporal join as a lookup source (aka. dimension
table). Currently, only sync lookup mode is supported.
+
+By default, lookup cache is not enabled. You can enable it by setting both
`lookup.cache.max-rows` and `lookup.cache.ttl`.
+
+The lookup cache is used to improve performance of temporal join theKudu
connector. By default, lookup cache is not enabled, so all the requests are
sent to external database. When lookup cache is enabled, each process (i.e.
TaskManager) will hold a cache. Flink will lookup the cache first, and only
send requests to external database when cache missing, and update cache with
the rows returned. The oldest rows in cache will be expired when the cache hit
to the max cached rows `kudu.lookup [...]
+
+Reference :[Flink Jdbc
Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/#lookup-cache)
+
+
### Known limitations
+
* Data type limitations (see above).
* SQL Create table: primary keys can only be set by the
`kudu.primary-key-columns` property, using the
-`PRIMARY KEY` constraint is not yet possible.
+ `PRIMARY KEY` constraint is not yet possible.
* SQL Create table: range partitioning is not supported.
* When getting a table through the Catalog, NOT NULL and PRIMARY KEY
constraints are ignored. All columns
-are described as being nullable, and not being primary keys.
+ are described as being nullable, and not being primary keys.
* Kudu tables cannot be altered through the catalog other than simple renaming
## DataStream API
@@ -219,13 +230,15 @@ with Kudu data.
### Reading tables into a DataStreams
There are 2 main ways of reading a Kudu Table into a DataStream
- 1. Using the `KuduCatalog` and the Table API
- 2. Using the `KuduRowInputFormat` directly
+
+1. Using the `KuduCatalog` and the Table API
+2. Using the `KuduRowInputFormat` directly
Using the `KuduCatalog` and Table API is the recommended way of reading tables
as it automatically
guarantees type safety and takes care of configuration of our readers.
This is how it works in practice:
+
```java
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
tableSettings);
@@ -235,7 +248,6 @@ tableEnv.useCatalog("kudu");
Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
```
-
The second way of achieving the same thing is by using the
`KuduRowInputFormat` directly.
In this case we have to manually provide all information about our table:
@@ -246,18 +258,19 @@ KuduRowInputFormat inputFormat = new
KuduRowInputFormat(readerConfig, tableInfo)
DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
```
-
At the end of the day the `KuduTableSource` is just a convenient wrapper
around the `KuduRowInputFormat`.
### Kudu Sink
+
The connector provides a `KuduSink` class that can be used to consume
DataStreams
and write the results into a Kudu table.
The constructor takes 3 or 4 arguments.
- * `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
- * `KuduTableInfo` identifies the table to be written
- * `KuduOperationMapper` maps the records coming from the DataStream to a list
of Kudu operations.
- * `KuduFailureHandler` (optional): If you want to provide your own logic for
handling writing failures.
+
+* `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
+* `KuduTableInfo` identifies the table to be written
+* `KuduOperationMapper` maps the records coming from the DataStream to a list
of Kudu operations.
+* `KuduFailureHandler` (optional): If you want to provide your own logic for
handling writing failures.
The example below shows the creation of a sink for Row type records of 3
fields. It Upserts each record.
It is assumed that a Kudu table with columns `col1, col2, col3` called
`AlreadyExistingTable` exists. Note that if this were not the case,
@@ -275,7 +288,6 @@ KuduSink<Row> sink = new KuduSink<>(
AbstractSingleOperationMapper.KuduOperation.UPSERT)
)
```
-
#### KuduOperationMapper
This section describes the Operation mapping logic in more detail.
@@ -299,12 +311,13 @@ It is also possible to implement your own logic by
overriding the
`createBaseOperation` method that returns a Kudu
[Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).
There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple
types for constant operation, 1-to-1 sinks.
+
* `PojoOperationMapper`: Each table column must correspond to a POJO field
-with the same name. The `columnNames` array should contain those fields of
the POJO that
-are present as table columns (the POJO fields can be a superset of table
columns).
+ with the same name. The `columnNames` array should contain those fields of
the POJO that
+ are present as table columns (the POJO fields can be a superset of table
columns).
* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on
position. The
-`i`th field of the Row/Tuple corresponds to the column of the table at the
`i`th
-position in the `columnNames` array.
+ `i`th field of the Row/Tuple corresponds to the column of the table at the
`i`th
+ position in the `columnNames` array.
## Building the connector
@@ -314,7 +327,6 @@ The connector can be easily built by using maven:
cd bahir-flink
mvn clean install
```
-
### Running the tests
The integration tests rely on the Kudu test harness which requires the current
user to be able to ssh to localhost.
diff --git a/site/docs/flink/current/flink-streaming-netty.md
b/site/docs/flink/current/flink-streaming-netty.md
index 320537c..11bed0b 100644
--- a/site/docs/flink/current/flink-streaming-netty.md
+++ b/site/docs/flink/current/flink-streaming-netty.md
@@ -61,7 +61,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-netty_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
```
diff --git a/site/docs/flink/current/flink-streaming-pinot.md
b/site/docs/flink/current/flink-streaming-pinot.md
index b4c9a7b..c25ad4a 100644
--- a/site/docs/flink/current/flink-streaming-pinot.md
+++ b/site/docs/flink/current/flink-streaming-pinot.md
@@ -33,7 +33,7 @@ To use this connector, add the following dependency to your
project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-pinot_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Pinot 0.6.0.
diff --git a/site/docs/flink/current/flink-streaming-redis.md
b/site/docs/flink/current/flink-streaming-redis.md
index 0c646fb..5339ea5 100644
--- a/site/docs/flink/current/flink-streaming-redis.md
+++ b/site/docs/flink/current/flink-streaming-redis.md
@@ -34,7 +34,7 @@ following dependency to your project:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
+ <version>1.2-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Redis 2.8.5.
diff --git a/site/docs/flink/overview.md b/site/docs/flink/overview.md
index a6793b6..a7e2abe 100644
--- a/site/docs/flink/overview.md
+++ b/site/docs/flink/overview.md
@@ -27,5 +27,6 @@ limitations under the License.
### Apache Bahir Extensions for Apache Flink
- - [Current - 1.1-SNAPSHOT](/docs/flink/current/documentation)
+ - [Current - 1.2-SNAPSHOT](/docs/flink/current/documentation)
+ - [1.1.0](/docs/flink/1.1.0/documentation)
- [1.0](/docs/flink/1.0/documentation)
diff --git a/site/index.md b/site/index.md
index e6181d4..6722892 100644
--- a/site/index.md
+++ b/site/index.md
@@ -50,10 +50,13 @@ Currently, {{ site.data.project.short_name }} provides
extensions for [Apache Sp
- Flink streaming connector for ActiveMQ
- Flink streaming connector for Akka
- Flink streaming connector for Flume
- - Flink streaming connector for InfluxDB
{:height="36px" width="36px"}
+ - Flink streaming connector for InfluxDB
+ - Flink streaming connector for InfluxDB2
{:height="36px" width="36px"}
- Flink streaming connector for Kudu
{:height="36px" width="36px"}
- Flink streaming connector for Redis
- Flink streaming connector for Netty
+ - Flink streaming connector for Pinot
{:height="36px" width="36px"}
+ - Flink streaming connector for Redis
The {{ site.data.project.name }} community welcomes the proposal of new
extensions.