[
https://issues.apache.org/jira/browse/BAHIR-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zl updated BAHIR-232:
---------------------
Description:
flink-connector-activemq does not support Flink Table API & SQL, based on the
the existing code, it is not very difficult to support this feature, we just
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with ("
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' =
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' =
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6',
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones',
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table =
tEnv.sqlQuery(QUERY_TABLE_SQL);
{code}
was:
flink-connector-activemq does not support Flink Table API & SQL, based on the
the existing code, it is not very difficult to support this feature, we just
need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory
and AMQTableSourceFactory. Then we can connect activemq by the following way:
{code:java}
// code placeholder
{code}
String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title
varchar, " + " author varchar, " + " price double, " + " qty int " + ") with ("
+ " 'connector.type' = 'activemq', " + " 'connector.broker-url' =
'vm://localhost?broker.persistent=false', " + " 'connector.destination-type' =
'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String
INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public for
dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for dummies', 'Tan
Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more dummies', 'Mohammad
Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
"(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" + "(1006, 'A
Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007, 'A Teaspoon of
Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon of Java 1.6',
'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java 1.7', 'Kevin
Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones',
33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";// create activemq
source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce event to activemq
tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from activemq Table table =
tEnv.sqlQuery(QUERY_TABLE_SQL);
> Support Flink Table API & SQL for flink-connector-activemq
> ----------------------------------------------------------
>
> Key: BAHIR-232
> URL: https://issues.apache.org/jira/browse/BAHIR-232
> Project: Bahir
> Issue Type: Improvement
> Components: Flink Streaming Connectors
> Affects Versions: Flink-1.0
> Reporter: zl
> Priority: Major
>
> flink-connector-activemq does not support Flink Table API & SQL, based on the
> the existing code, it is not very difficult to support this feature, we just
> need to provide 4 classes: AMQTableSource、AMQTableSink、AMQTableSourceFactory
> and AMQTableSourceFactory. Then we can connect activemq by the following way:
> {code:java}
> String TABLE_CREATE_SQL = "CREATE TABLE books (" + " id int, " + " title
> varchar, " + " author varchar, " + " price double, " + " qty int " + ") with
> (" + " 'connector.type' = 'activemq', " + " 'connector.broker-url' =
> 'vm://localhost?broker.persistent=false', " + " 'connector.destination-type'
> = 'QUEUE', " + " 'connector.destination-name' = 'source_queue' " + ")";String
> INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" + "(1001, 'Java public
> for dummies', 'Tan Ah Teck', 11.11, 11),\n" + "(1002, 'More Java for
> dummies', 'Tan Ah Teck', 22.22, 22),\n" + "(1003, 'More Java for more
> dummies', 'Mohammad Ali', 33.33, 33),\n" + "(1004, 'A Cup of Java', 'Kumar',
> 44.44, 44),\n" + "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n"
> + "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" + "(1007,
> 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" + "(1008, 'A Teaspoon
> of Java 1.6', 'Kevin Jones', 88.88, 88),\n" + "(1009, 'A Teaspoon of Java
> 1.7', 'Kevin Jones', 99.99, 99),\n" + "(1010, 'A Teaspoon of Java 1.8',
> 'Kevin Jones', 33.33, 100)";String QUERY_TABLE_SQL = "SELECT * FROM books";//
> create activemq source table tEnv.sqlUpdate(TABLE_CREATE_SQL);// produce
> event to activemq tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);// consume from
> activemq Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)