GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/2069
[FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource
Adds `StreamTableSource` variants for Kafka with syntactic sugar for
parsing JSON streams.
```java
KafkaJsonTableSource source = new Kafka08JsonTableSource(
topic,
props,
new String[] { "id" }, // field names
new Class<?>[] { Long.class }); // field types
tableEnvironment.registerTableSource("kafka-stream", source)
```
You can then continue to work with the stream:
```java
Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
tableEnvironment.toDataStream(result, Row.class).print();
```
**Limitations**
- Assumes flat JSON field access (we can easily extend this to use JSON
pointers, allowing us to parse nested fields like `/location/area` as field
names).
- This does not extract any timestamp or watermarks (not an issue right now
as the Table API currently does not support operations where this is needed).
- API is kind of cumbersome and non Scalaesque for the Scala Table API.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink 3872-kafkajson_table
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2069.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2069
----
commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67
Author: Ufuk Celebi <[email protected]>
Date: 2016-06-02T20:38:23Z
[FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema
- Adds a deserialization schema from byte[] to Row to be used in conjunction
with the Table API.
commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9
Author: Ufuk Celebi <[email protected]>
Date: 2016-06-03T13:24:22Z
[FLINK-3872] [table, connector-kafka] Add KafkaTableSource
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---