[ 
https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16975898#comment-16975898
 ] 

Jark Wu edited comment on FLINK-14567 at 11/17/19 3:07 AM:
-----------------------------------------------------------

Another soution is letting sink concats keys. For example, an HBase sink can 
have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option 
is required to concat key fields to a rowkey, the concated rowkey will always 
be varchar type. The HBase sink will insert the concated rowkey into HBase 
table. 


{code:java}
create table my_table (
  k1 int,
  k2 varchar,
  k3 timestamp(3),
  f1 row<q1 bigint, q2 bigint>
) with (
  'connector.type' = 'hbase',
  'connector.key-delimiter' = '-'
);

insert into my_table 
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}

This is very similar to the [ElasticSearch 
Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
 In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, 
because the sink pretends it has a composite key with 3 fields. 


However, this can't solve all the problems. For example, if one of the 3 fields 
is transformed from the group key, but the transformation will lose key 
information.


was (Author: jark):
Another soution is letting sink concats keys. For example, an HBase sink can 
have more than one key fields, say k1, k2, k3, then a {{key-delimiter}} option 
is required to concat key fields to a rowkey, the concated rowkey will always 
be varchar type. The HBase sink will insert the concated rowkey into HBase 
table. 


{code:java}
create table my_table (
  k1 int,
  k2 varchar,
  k3 timestamp(3),
  f1 row<q1 bigint, q2 bigint>
) with (
  'connector.type' = 'hbase',
  'connector.key-delimiter' = '-'
);

insert into my_table 
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}

This is very similar to the [ElasticSearch 
Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
 In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, 
because the sink pretends it has a composite key with 3 fields. 


However, this can't solve all the problems. For example, if the one of the 3 
fields is transformed form the group key, but the transformation will lose key 
information.

> Aggregate query with more than two group fields can't be write into HBase sink
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-14567
>                 URL: https://issues.apache.org/jira/browse/FLINK-14567
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase, Table SQL / Legacy Planner, Table 
> SQL / Planner
>            Reporter: Jark Wu
>            Priority: Critical
>             Fix For: 1.10.0
>
>
> If we have a hbase table sink with rowkey of varchar (also primary key) and a 
> column of bigint, we want to write the result of the following query into the 
> sink using upsert mode. However, it will fail when primary key check with the 
> exception "UpsertStreamTableSink requires that Table has a full primary keys 
> if it is updated."
> {code:sql}
> select concat(f0, '-', f1) as key, sum(f2)
> from T1
> group by f0, f1
> {code}
> This happens in both blink planner and old planner. That is because if the 
> query works in update mode, then there must be a primary key exist to be 
> extracted and set to {{UpsertStreamTableSink#setKeyFields}}. 
> That's why we want to derive primary key for concat in FLINK-14539, however, 
> we found that the primary key is not preserved after concating. For example, 
> if we have a primary key (f0, f1, f2) which are all varchar type, say we have 
> two unique records ('a', 'b', 'c') and ('ab', '', 'c'), but the results of 
> concat(f0, f1, f2) are the same, which means the concat result is not primary 
> key anymore.
> So here comes the problem, how can we proper support HBase sink or such use 
> case? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to