[ https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ferenc Csaky closed FLINK-36188. -------------------------------- Resolution: Fixed [{{c6eb87f}}|https://github.com/apache/flink-connector-hbase/commit/c6eb87f809cd30ad352d1644980eefce658bb63f] in main > HBase disable buffer flush lose efficacy > ---------------------------------------- > > Key: FLINK-36188 > URL: https://issues.apache.org/jira/browse/FLINK-36188 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase > Affects Versions: hbase-3.0.0 > Environment: Flink 1.16.3 > Reporter: MOBIN > Assignee: MOBIN > Priority: Critical > Labels: pull-request-available > Fix For: hbase-4.0.0 > > > HBase table > ||rowkey||col|| > |1|1| > The user lookup joins the hbase table, adds 1 to the col value, and writes it > back to hbase > {code:java} > @Test > void testTableSinkDisabledBufferFlush() throws Exception { > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, > streamSettings); > tEnv.executeSql( > "CREATE TABLE hTableForSink (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW<col1 INT>" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'sink.buffer-flush.max-size' = '0'," > + " 'sink.buffer-flush.max-rows' = '0'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; > tEnv.executeSql(insert).await(); > tEnv.executeSql( > "CREATE VIEW user_click AS " > + " SELECT user_id, proctime() AS proc_time" > + " FROM ( " > + " VALUES(1), (1), (1), (1), (1)" > + " ) AS t (user_id);"); > > tEnv.executeSql( > "INSERT INTO hTableForSink SELECT " > + " user_id as rowkey," > + " ROW(CAST(family1.col1 + 1 AS INT))" > + " FROM user_click INNER JOIN hTableForSink" > + " FOR SYSTEM_TIME AS OF user_click.proc_time" > + " ON hTableForSink.rowkey = user_click.user_id;"); > > tEnv.executeSql( > "CREATE TABLE hTableForQuery (" > + " rowkey INT PRIMARY KEY NOT ENFORCED," > + " family1 ROW<col1 INT>" > + ") WITH (" > + " 'connector' = 'hbase-2.2'," > + " 'table-name' = '" > + TEST_TABLE_6 > + "'," > + " 'zookeeper.quorum' = '" > + getZookeeperQuorum() > + "'" > + ")"); > String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; > > TableResult firstResult = tEnv.executeSql(query); > List<Row> firstResults = > CollectionUtil.iteratorToList(firstResult.collect()); > String firstExpected = "+I[1, 6]"; > TestBaseUtils.compareResultAsText(firstResults, firstExpected); > } {code} > test failed > {code:java} > org.junit.ComparisonFailure: Different elements in arrays: expected 1 > elements and received 1 > expected: [+I[1, 6]] > received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]> > Expected :+I[1, 6] > Actual :+I[1, 2] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)