[ 
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36188:
---------------------------------
    Fix Version/s: hbase-3.0.1

> HBase disable buffer flush lose efficacy
> ----------------------------------------
>
>                 Key: FLINK-36188
>                 URL: https://issues.apache.org/jira/browse/FLINK-36188
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: hbase-3.0.0
>         Environment: Flink 1.16.3
>            Reporter: MOBIN
>            Assignee: MOBIN
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: hbase-3.0.1, hbase-4.0.0
>
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it 
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
>         StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, 
> streamSettings);        
>             tEnv.executeSql(
>                 "CREATE TABLE hTableForSink ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW<col1 INT>"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'sink.buffer-flush.max-size' = '0',"
>                         + " 'sink.buffer-flush.max-rows' = '0',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");       
>             String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
>         tEnv.executeSql(insert).await();        
>             tEnv.executeSql(
>                 "CREATE VIEW user_click AS "
>                         + " SELECT user_id, proctime() AS proc_time"
>                         + " FROM ( "
>                         + " VALUES(1), (1), (1), (1), (1)"
>                         + " ) AS t (user_id);");    
>     
>            tEnv.executeSql(
>                 "INSERT INTO hTableForSink SELECT "
>                         + "    user_id as rowkey,"
>                         + "    ROW(CAST(family1.col1 + 1 AS INT))"
>                         + " FROM user_click INNER JOIN hTableForSink"
>                         + " FOR SYSTEM_TIME AS OF user_click.proc_time"
>                         + " ON hTableForSink.rowkey = user_click.user_id;");  
>       
>             tEnv.executeSql(
>                 "CREATE TABLE hTableForQuery ("
>                         + " rowkey INT PRIMARY KEY NOT ENFORCED,"
>                         + " family1 ROW<col1 INT>"
>                         + ") WITH ("
>                         + " 'connector' = 'hbase-2.2',"
>                         + " 'table-name' = '"
>                         + TEST_TABLE_6
>                         + "',"
>                         + " 'zookeeper.quorum' = '"
>                         + getZookeeperQuorum()
>                         + "'"
>                         + ")");
>         String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";     
>    
>             TableResult firstResult = tEnv.executeSql(query);
>         List<Row> firstResults = 
> CollectionUtil.iteratorToList(firstResult.collect());
>         String firstExpected = "+I[1, 6]";
>         TestBaseUtils.compareResultAsText(firstResults, firstExpected);
>     } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1 
> elements and received 1
>  expected: [+I[1, 6]]
>  received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual   :+I[1, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to