[
https://issues.apache.org/jira/browse/BAHIR-220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yuemeng updated BAHIR-220:
--------------------------
Description:
currently, for Flink-1.9.0, we can use the catalog to store our stream table
source and sink
for Redis connector, it should exist a Redis table sink so we can register it
to catalog, and use Redis as a table in SQL environment
{code}
Redis redis = new Redis()
.mode(RedisVadidator.REDIS_CLUSTER)
.command(RedisCommand.INCRBY_EX.name())
.ttl(100000)
.property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" +
REDIS_PORT);
tableEnvironment
.connect(redis).withSchema(new Schema()
.field("k", TypeInformation.of(String.class))
.field("v", TypeInformation.of(Long.class)))
.registerTableSink("redis");
tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
env.execute("Test Redis Table");
{code}
was:
currently, for Flink-1.9.0, we can use the catalog to store our stream table
source and sink meta.
for Redis connector, it should exist a Redis table sink so we can register it
to catalog, and use Redis as a table in SQL environment
{code}
Redis redis = new Redis()
.mode(RedisVadidator.REDIS_CLUSTER)
.command(RedisCommand.INCRBY_EX.name())
.ttl(100000)
.property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" +
REDIS_PORT);
tableEnvironment
.connect(redis).withSchema(new Schema()
.field("k", TypeInformation.of(String.class))
.field("v", TypeInformation.of(Long.class)))
.registerTableSink("redis");
tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
env.execute("Test Redis Table");
{code}
> Add redis descriptor to make redis connection as a table
> --------------------------------------------------------
>
> Key: BAHIR-220
> URL: https://issues.apache.org/jira/browse/BAHIR-220
> Project: Bahir
> Issue Type: Improvement
> Components: Flink Streaming Connectors
> Affects Versions: Flink-1.0
> Reporter: yuemeng
> Priority: Major
>
> currently, for Flink-1.9.0, we can use the catalog to store our stream table
> source and sink
> for Redis connector, it should exist a Redis table sink so we can register it
> to catalog, and use Redis as a table in SQL environment
> {code}
> Redis redis = new Redis()
> .mode(RedisVadidator.REDIS_CLUSTER)
> .command(RedisCommand.INCRBY_EX.name())
> .ttl(100000)
> .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" +
> REDIS_PORT);
> tableEnvironment
> .connect(redis).withSchema(new Schema()
> .field("k", TypeInformation.of(String.class))
> .field("v", TypeInformation.of(Long.class)))
> .registerTableSink("redis");
> tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
> env.execute("Test Redis Table");
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)