[ https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16975898#comment-16975898 ]
Jark Wu edited comment on FLINK-14567 at 11/17/19 3:07 AM: ----------------------------------------------------------- Another soution is letting sink concats keys. For example, an HBase sink can have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option is required to concat key fields to a rowkey, the concated rowkey will always be varchar type. The HBase sink will insert the concated rowkey into HBase table. {code:java} create table my_table ( k1 int, k2 varchar, k3 timestamp(3), f1 row<q1 bigint, q2 bigint> ) with ( 'connector.type' = 'hbase', 'connector.key-delimiter' = '-' ); insert into my_table select k1, k2, k3, ROW(count(*), count(distinct user)) group by k1, k2, k3 {code} This is very similar to the [ElasticSearch Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector]. In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, because the sink pretends it has a composite key with 3 fields. However, this can't solve all the problems. For example, if one of the 3 fields is transformed from the group key, but the transformation will lose key information. was (Author: jark): Another soution is letting sink concats keys. For example, an HBase sink can have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option is required to concat key fields to a rowkey, the concated rowkey will always be varchar type. The HBase sink will insert the concated rowkey into HBase table. {code:java} create table my_table ( k1 int, k2 varchar, k3 timestamp(3), f1 row<q1 bigint, q2 bigint> ) with ( 'connector.type' = 'hbase', 'connector.key-delimiter' = '-' ); insert into my_table select k1, k2, k3, ROW(count(*), count(distinct user)) group by k1, k2, k3 {code} This is very similar to the [ElasticSearch Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector]. In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, because the sink pretends it has a composite key with 3 fields. However, this can't solve all the problems. For example, if the one of the 3 fields is transformed form the group key, but the transformation will lose key information. > Aggregate query with more than two group fields can't be write into HBase sink > ------------------------------------------------------------------------------ > > Key: FLINK-14567 > URL: https://issues.apache.org/jira/browse/FLINK-14567 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase, Table SQL / Legacy Planner, Table > SQL / Planner > Reporter: Jark Wu > Priority: Critical > Fix For: 1.10.0 > > > If we have a hbase table sink with rowkey of varchar (also primary key) and a > column of bigint, we want to write the result of the following query into the > sink using upsert mode. However, it will fail when primary key check with the > exception "UpsertStreamTableSink requires that Table has a full primary keys > if it is updated." > {code:sql} > select concat(f0, '-', f1) as key, sum(f2) > from T1 > group by f0, f1 > {code} > This happens in both blink planner and old planner. That is because if the > query works in update mode, then there must be a primary key exist to be > extracted and set to {{UpsertStreamTableSink#setKeyFields}}. > That's why we want to derive primary key for concat in FLINK-14539, however, > we found that the primary key is not preserved after concating. For example, > if we have a primary key (f0, f1, f2) which are all varchar type, say we have > two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of > concat(f0, f1, f2) are the same, which means the concat result is not primary > key anymore. > So here comes the problem, how can we proper support HBase sink or such use > case? -- This message was sent by Atlassian Jira (v8.3.4#803005)