[ https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jing Ge updated FLINK-25560: ---------------------------- Affects Version/s: (was: 1.15.0) > Add "sink.delete.mode" in HBase sql connector for retracting the latest > version or all versions in changelog mode > ----------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25560 > URL: https://issues.apache.org/jira/browse/FLINK-25560 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase > Reporter: Bruce Wong > Priority: Major > Labels: pull-request-available > > h1. Motivation > When we synchronize data from mysql to HBase, we find that when deleting data > from mysql, HBase cannot delete all versions, which leads to incorrect > semantics. So we want to add a parameter to control deleting the latest > version or deleting all versions. > h1. Usage > The test code is as follows. > {code:java} > package com.bruce; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.TableConfig; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static > org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER; > public class KafkaToHBase { > public static void main(String[] args) { > Configuration cfg = new Configuration(); > cfg.setBoolean(LOCAL_START_WEBSERVER, true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > envSettings); > // TableConfig config = tEnv.getConfig(); > // config.setIdleStateRetention(Duration.ofHours(2)); > String source = "CREATE TEMPORARY TABLE IF NOT EXISTS > kafka_llspay_bundles(\n" + > " id STRING,\n" + > " category_id STRING,\n" + > " upc STRING,\n" + > " `name` STRING,\n" + > " price_cents STRING,\n" + > " original_price_cents STRING,\n" + > " short_desc STRING,\n" + > " desc STRING,\n" + > " cover_url STRING,\n" + > " created_at STRING,\n" + > " updated_at STRING,\n" + > " deleted_at STRING,\n" + > " extra STRING,\n" + > " status STRING,\n" + > " scholarship_cents STRING,\n" + > " is_payback STRING,\n" + > " is_support_iap STRING,\n" + > " iap_product_id STRING,\n" + > " neo_product_code STRING,\n" + > " paid_redirect_url STRING,\n" + > " subscription_type STRING\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'dim-bundles',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'properties.group.id' = 'vvp_dev',\n" + > " 'scan.startup.mode' = 'latest-offset',\n" + > " 'value.debezium-json.schema-include' = 'true',\n" + > " 'value.format' = 'debezium-json',\n" + > " 'value.debezium-json.ignore-parse-errors' = 'true'\n" + > ")"; > String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" + > " rowkey STRING,\n" + > " cf ROW<id STRING, category_id STRING, upc STRING, `name` > STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, > `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, > deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, > is_payback STRING, is_support_iap STRING, iap_product_id STRING, > neo_product_code STRING, paid_redirect_url STRING, subscription_type > STRING>\n" + > ") with (\n" + > " 'connector'='hbase-2.2',\n" + > " 'table-name'='dim_hbase',\n" + > " 'zookeeper.quorum'='localhost:2181',\n" + > " 'sink.buffer-flush.max-size' = '0',\n" + > " 'sink.buffer-flush.max-rows' = '1',\n" + > " 'sink.delete.mode' = 'all-versions'\n" + > ")"; > String dml = "INSERT INTO dim_hbase\n" + > "SELECT \n" + > " upc as rowkey,\n" + > " ROW(\n" + > " id, category_id, upc, `name`, price_cents, > original_price_cents, short_desc, `desc` , cover_url , created_at, > updated_at, deleted_at, extra , status , scholarship_cents , is_payback , > is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , > subscription_type)\n" + > "FROM kafka_llspay_bundles"; > tEnv.executeSql(source); > tEnv.executeSql(sink); > tEnv.executeSql(dml); > } > } {code} > h1. Reference > Please look at the following task link. > FLINK-25330 > -- This message was sent by Atlassian Jira (v8.20.1#820001)