[ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Ge updated FLINK-25560:
----------------------------
    Affects Version/s:     (was: 1.15.0)

> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25560
>                 URL: https://issues.apache.org/jira/browse/FLINK-25560
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase
>            Reporter: Bruce Wong
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
>     public static void main(String[] args) {
>         Configuration cfg = new Configuration();
>         cfg.setBoolean(LOCAL_START_WEBSERVER, true);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
>         env.setParallelism(1);
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //        TableConfig config = tEnv.getConfig();
> //        config.setIdleStateRetention(Duration.ofHours(2));
>         String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
>                 "  id                   STRING,\n" +
>                 "  category_id           STRING,\n" +
>                 "  upc                   STRING,\n" +
>                 "  `name`                STRING,\n" +
>                 "  price_cents           STRING,\n" +
>                 "  original_price_cents  STRING,\n" +
>                 "  short_desc            STRING,\n" +
>                 "  desc                  STRING,\n" +
>                 "  cover_url             STRING,\n" +
>                 "  created_at            STRING,\n" +
>                 "  updated_at            STRING,\n" +
>                 "  deleted_at            STRING,\n" +
>                 "  extra                 STRING,\n" +
>                 "  status                STRING,\n" +
>                 "  scholarship_cents     STRING,\n" +
>                 "  is_payback            STRING,\n" +
>                 "  is_support_iap        STRING,\n" +
>                 "  iap_product_id        STRING,\n" +
>                 "  neo_product_code      STRING,\n" +
>                 "  paid_redirect_url     STRING,\n" +
>                 "  subscription_type     STRING\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'kafka',\n" +
>                 "  'topic' = 'dim-bundles',\n" +
>                 "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 "  'properties.group.id' = 'vvp_dev',\n" +
>                 "  'scan.startup.mode' = 'latest-offset',\n" +
>                 "  'value.debezium-json.schema-include' = 'true',\n" +
>                 "  'value.format' = 'debezium-json',\n" +
>                 "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
>                 ")";
>         String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
>                 "    rowkey      STRING,\n" +
>                 "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
> STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
>                 ") with (\n" +
>                 "  'connector'='hbase-2.2',\n" +
>                 "  'table-name'='dim_hbase',\n" +
>                 "  'zookeeper.quorum'='localhost:2181',\n" +
>                 "  'sink.buffer-flush.max-size' = '0',\n" +
>                 "  'sink.buffer-flush.max-rows' = '1',\n" +
>                 "  'sink.delete.mode' = 'all-versions'\n" +
>                 ")";
>         String dml = "INSERT INTO dim_hbase\n" +
>                 "SELECT \n" +
>                 "  upc as rowkey,\n" +
>                 "  ROW(\n" +
>                 "    id, category_id, upc, `name`, price_cents, 
> original_price_cents, short_desc, `desc` , cover_url , created_at, 
> updated_at, deleted_at, extra , status , scholarship_cents , is_payback , 
> is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , 
> subscription_type)\n" +
>                 "FROM kafka_llspay_bundles";
>         tEnv.executeSql(source);
>         tEnv.executeSql(sink);
>         tEnv.executeSql(dml);
>     }
> } {code}
> h1. Reference
> Please look at the following task link.
> FLINK-25330
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to